/*
 * Decompiled with CFR 0.152.
 */
package org.hypertable.hadoop.mapred;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.hypertable.thrift.SerializedCellsWriter;
import org.hypertable.thrift.ThriftClient;
import org.hypertable.thriftgen.MutatorFlag;

public class TextTableOutputFormat
implements OutputFormat<Text, Text> {
    private static final Log log = LogFactory.getLog(TextTableOutputFormat.class);
    public static final String NAMESPACE = "hypertable.mapreduce.namespace";
    public static final String OUTPUT_NAMESPACE = "hypertable.mapreduce.output.namespace";
    public static final String TABLE = "hypertable.mapreduce.output.table";
    public static final String MUTATOR_FLAGS = "hypertable.mapreduce.output.mutator_flags";
    public static final String MUTATOR_FLUSH_INTERVAL = "hypertable.mapreduce.output.mutator_flush_interval";
    public static final int CLIENT_BUFFER_SIZE = 0xC00000;

    public RecordWriter<Text, Text> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
        String namespace = job.get(OUTPUT_NAMESPACE);
        if (namespace == null) {
            namespace = job.get(NAMESPACE);
        }
        String table = job.get(TABLE);
        int flags = job.getInt(MUTATOR_FLAGS, 0);
        int flush_interval = job.getInt(MUTATOR_FLUSH_INTERVAL, 0);
        try {
            return new HypertableRecordWriter(namespace, table, flags, flush_interval);
        }
        catch (Exception e) {
            log.error((Object)e);
            throw new IOException("Unable to access RecordWriter - " + e.toString());
        }
    }

    public void checkOutputSpecs(FileSystem ignore, JobConf conf) throws IOException {
    }

    protected static class HypertableRecordWriter
    implements RecordWriter<Text, Text> {
        private ThriftClient mClient;
        private long mNamespaceId = 0L;
        private long mMutator;
        private String namespace;
        private String table;
        private Text m_line = new Text();
        private SerializedCellsWriter mCellsWriter = new SerializedCellsWriter(0xC00000);
        private static String utf8 = "UTF-8";
        private static final byte[] tab;
        private static final byte[] colon;
        private static final String colon_str;
        private static final String tab_str;

        public HypertableRecordWriter(String namespace, String table, int flags, int flush_interval) throws IOException {
            try {
                this.namespace = namespace;
                this.table = table;
                this.mClient = ThriftClient.create("localhost", 38080);
                this.mNamespaceId = this.mClient.namespace_open(namespace);
                this.mMutator = this.mClient.mutator_open(this.mNamespaceId, table, flags, flush_interval);
            }
            catch (Exception e) {
                log.error((Object)e);
                throw new IOException("Unable to open thrift mutator - " + e.toString());
            }
        }

        public HypertableRecordWriter(String namespace, String table) throws IOException {
            this(namespace, table, MutatorFlag.NO_LOG_SYNC.getValue(), 0);
        }

        public HypertableRecordWriter(String namespace, String table, int flags) throws IOException {
            this(namespace, table, flags, 0);
        }

        public void close(Reporter reporter) throws IOException {
            try {
                if (!this.mCellsWriter.isEmpty()) {
                    this.mClient.mutator_set_cells_serialized(this.mMutator, this.mCellsWriter.buffer(), false);
                    this.mCellsWriter.clear();
                }
                if (this.mNamespaceId != 0L) {
                    this.mClient.namespace_close(this.mNamespaceId);
                }
                this.mClient.mutator_close(this.mMutator);
            }
            catch (Exception e) {
                log.error((Object)e);
                throw new IOException("Unable to close thrift mutator - " + e.toString());
            }
        }

        public void write(Text key, Text value) throws IOException {
            try {
                boolean has_timestamp;
                key.append(tab, 0, tab.length);
                this.m_line.clear();
                this.m_line.append(key.getBytes(), 0, key.getLength());
                this.m_line.append(value.getBytes(), 0, value.getLength());
                int len = this.m_line.getLength();
                int tab_count = 0;
                int tab_pos = 0;
                int found = 0;
                while (found != -1) {
                    if ((found = this.m_line.find(tab_str, found + 1)) <= 0 || ++tab_count != 1) continue;
                    tab_pos = found;
                }
                if (tab_count >= 3) {
                    has_timestamp = true;
                } else if (tab_count == 2) {
                    has_timestamp = false;
                } else {
                    throw new Exception("incorrect output line format only " + tab_count + " tabs");
                }
                byte[] byte_array = this.m_line.getBytes();
                int family_offset = 0;
                int family_length = 0;
                int qualifier_offset = 0;
                int qualifier_length = 0;
                int value_offset = 0;
                int value_length = 0;
                long timestamp = -9223372036854775806L;
                int offset = 0;
                if (has_timestamp) {
                    timestamp = Long.parseLong(Text.decode((byte[])byte_array, (int)0, (int)tab_pos));
                    offset = tab_pos + 1;
                }
                int row_offset = offset;
                tab_pos = this.m_line.find(tab_str, offset);
                int row_length = tab_pos - row_offset;
                family_offset = offset = tab_pos + 1;
                tab_pos = this.m_line.find(tab_str, offset);
                for (int i = family_offset; i < tab_pos; ++i) {
                    if (byte_array[i] != 58 || qualifier_offset != 0) continue;
                    family_length = i - family_offset;
                    qualifier_offset = i + 1;
                }
                if (qualifier_offset == 0) {
                    family_length = tab_pos - family_offset;
                } else {
                    qualifier_length = tab_pos - qualifier_offset;
                }
                value_offset = offset = tab_pos + 1;
                value_length = len - value_offset;
                if (!this.mCellsWriter.add(byte_array, row_offset, row_length, byte_array, family_offset, family_length, byte_array, qualifier_offset, qualifier_length, timestamp, byte_array, value_offset, value_length, (byte)-1)) {
                    this.mClient.mutator_set_cells_serialized(this.mMutator, this.mCellsWriter.buffer(), false);
                    this.mCellsWriter.clear();
                    if (row_length + family_length + qualifier_length + value_length + 32 > this.mCellsWriter.capacity()) {
                        this.mCellsWriter = new SerializedCellsWriter(row_length + family_length + qualifier_length + value_length + 32);
                    }
                    if (!this.mCellsWriter.add(byte_array, row_offset, row_length, byte_array, family_offset, family_length, byte_array, qualifier_offset, qualifier_length, timestamp, byte_array, value_offset, value_length, (byte)-1)) {
                        throw new IOException("Unable to add cell to SerializedCellsWriter (row='" + new String(byte_array, row_offset, row_length, "UTF-8") + "'");
                    }
                }
            }
            catch (Exception e) {
                log.error((Object)e);
                throw new IOException("Unable to write cell - " + e.toString());
            }
        }

        static {
            try {
                tab = "\t".getBytes(utf8);
                tab_str = new String(tab);
                colon = ":".getBytes(utf8);
                colon_str = new String(colon);
            }
            catch (UnsupportedEncodingException uee) {
                throw new IllegalArgumentException("can't find " + utf8 + " encoding");
            }
        }
    }
}

