/*
 * Decompiled with CFR 0.152.
 */
package org.hypertable.hadoop.mapreduce;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.hypertable.hadoop.mapreduce.OutputCommitter;
import org.hypertable.hadoop.mapreduce.OutputFormat;
import org.hypertable.thrift.ThriftClient;
import org.hypertable.thriftgen.MutatorFlag;

public class SerializedCellsOutputFormat
extends org.apache.hadoop.mapreduce.OutputFormat<NullWritable, BytesWritable> {
    private static final Log log = LogFactory.getLog(OutputFormat.class);
    public static final String NAMESPACE = "hypertable.mapreduce.output.namespace";
    public static final String TABLE = "hypertable.mapreduce.output.table";
    public static final String MUTATOR_FLAGS = "hypertable.mapreduce.output.mutator-flags";
    public static final String MUTATOR_FLUSH_INTERVAL = "hypertable.mapreduce.output.mutator-flush-interval";

    public org.apache.hadoop.mapreduce.RecordWriter<NullWritable, BytesWritable> getRecordWriter(TaskAttemptContext ctx) throws IOException {
        String namespace = ctx.getConfiguration().get(NAMESPACE);
        String table = ctx.getConfiguration().get(TABLE);
        int flags = ctx.getConfiguration().getInt(MUTATOR_FLAGS, 0);
        int flush_interval = ctx.getConfiguration().getInt(MUTATOR_FLUSH_INTERVAL, 0);
        try {
            return new RecordWriter(namespace, table, flags, flush_interval);
        }
        catch (Exception e) {
            log.error((Object)e);
            throw new IOException("Unable to access RecordWriter - " + e.toString());
        }
    }

    public void checkOutputSpecs(JobContext ctx) throws IOException {
    }

    public org.apache.hadoop.mapreduce.OutputCommitter getOutputCommitter(TaskAttemptContext ctx) {
        return new OutputCommitter();
    }

    protected class RecordWriter
    extends org.apache.hadoop.mapreduce.RecordWriter<NullWritable, BytesWritable> {
        private ThriftClient mClient;
        private long mMutator;
        private long mNamespace;
        private String namespace;
        private String table;

        public RecordWriter(String namespace, String table, int flags, int flush_interval) throws IOException {
            try {
                this.namespace = namespace;
                this.table = table;
                this.mClient = ThriftClient.create("localhost", 38080);
                this.mNamespace = this.mClient.open_namespace(namespace);
                this.mMutator = this.mClient.open_mutator(this.mNamespace, table, flags, flush_interval);
            }
            catch (Exception e) {
                log.error((Object)e);
                throw new IOException("Unable to open thrift mutator - " + e.toString());
            }
        }

        public RecordWriter(String namespace, String table) throws IOException {
            this(namespace, table, MutatorFlag.NO_LOG_SYNC.getValue(), 0);
        }

        public RecordWriter(String namespace, String table, int flags) throws IOException {
            this(namespace, table, flags, 0);
        }

        public void close(TaskAttemptContext ctx) throws IOException {
            try {
                this.mClient.close_mutator(this.mMutator);
                this.mClient.close_namespace(this.mNamespace);
            }
            catch (Exception e) {
                log.error((Object)e);
                throw new IOException("Unable to close thrift mutator & namespace- " + e.toString());
            }
        }

        public void write(NullWritable key, BytesWritable value) throws IOException {
            try {
                this.mClient.mutator_set_cells_serialized(this.mMutator, ByteBuffer.wrap(value.getBytes()), false);
            }
            catch (Exception e) {
                log.error((Object)e);
                throw new IOException("Unable to write cell - " + e.toString());
            }
        }
    }
}

