/*
 * Decompiled with CFR 0.152.
 */
package org.hypertable.hadoop.mapreduce;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.hypertable.hadoop.mapreduce.KeyWritable;
import org.hypertable.hadoop.mapreduce.OutputCommitter;
import org.hypertable.thrift.ThriftClient;
import org.hypertable.thriftgen.Cell;
import org.hypertable.thriftgen.MutatorFlag;

public class OutputFormat
extends org.apache.hadoop.mapreduce.OutputFormat<KeyWritable, BytesWritable> {
    private static final Log log = LogFactory.getLog(OutputFormat.class);
    public static final String NAMESPACE = "hypertable.mapreduce.output.namespace";
    public static final String TABLE = "hypertable.mapreduce.output.table";
    public static final String MUTATOR_FLAGS = "hypertable.mapreduce.output.mutator-flags";
    public static final String MUTATOR_FLUSH_INTERVAL = "hypertable.mapreduce.output.mutator-flush-interval";

    public RecordWriter<KeyWritable, BytesWritable> getRecordWriter(TaskAttemptContext ctx) throws IOException {
        String namespace = ctx.getConfiguration().get(NAMESPACE);
        String table = ctx.getConfiguration().get(TABLE);
        int flags = ctx.getConfiguration().getInt(MUTATOR_FLAGS, 0);
        int flush_interval = ctx.getConfiguration().getInt(MUTATOR_FLUSH_INTERVAL, 0);
        try {
            return new HypertableRecordWriter(namespace, table, flags, flush_interval);
        }
        catch (Exception e) {
            log.error((Object)e);
            throw new IOException("Unable to access RecordWriter - " + e.toString());
        }
    }

    public void checkOutputSpecs(JobContext ctx) throws IOException {
    }

    public org.apache.hadoop.mapreduce.OutputCommitter getOutputCommitter(TaskAttemptContext ctx) {
        return new OutputCommitter();
    }

    protected static class HypertableRecordWriter
    extends RecordWriter<KeyWritable, BytesWritable> {
        private ThriftClient mClient;
        private long mNamespace;
        private long mMutator;
        private String table;
        private String namespace;

        public HypertableRecordWriter(String namespace, String table, int flags, int flush_interval) throws IOException {
            try {
                this.namespace = namespace;
                this.table = table;
                this.mClient = ThriftClient.create("localhost", 38080);
                this.mNamespace = this.mClient.namespace_open(namespace);
                this.mMutator = this.mClient.mutator_open(this.mNamespace, table, flags, flush_interval);
            }
            catch (Exception e) {
                log.error((Object)e);
                throw new IOException("Unable to open thrift mutator - " + e.toString());
            }
        }

        public HypertableRecordWriter(String namespace, String table) throws IOException {
            this(namespace, table, MutatorFlag.NO_LOG_SYNC.getValue(), 0);
        }

        public HypertableRecordWriter(String namespace, String table, int flags) throws IOException {
            this(namespace, table, flags, 0);
        }

        public void close(TaskAttemptContext ctx) throws IOException {
            try {
                this.mClient.mutator_close(this.mMutator);
                this.mClient.namespace_close(this.mNamespace);
            }
            catch (Exception e) {
                log.error((Object)e);
                throw new IOException("Unable to close thrift mutator & namespace- " + e.toString());
            }
        }

        public void write(KeyWritable key, BytesWritable value) throws IOException {
            try {
                Cell cell = new Cell();
                key.convert_buffers_to_strings();
                cell.key = key;
                cell.value = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
                this.mClient.mutator_set_cell(this.mMutator, cell);
            }
            catch (Exception e) {
                log.error((Object)e);
                throw new IOException("Unable to write cell - " + e.toString());
            }
        }
    }
}

