/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkMetrics;
import org.apache.hadoop.hbase.util.Bytes;

public class ReplicationSink {
    private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
    public static final String REPLICATION_LOG_DIR = ".replogs";
    private final Configuration conf;
    private final HTablePool pool;
    private final Stoppable stopper;
    private final ReplicationSinkMetrics metrics;

    public ReplicationSink(Configuration conf, Stoppable stopper) throws IOException {
        this.conf = conf;
        this.pool = new HTablePool(this.conf, conf.getInt("replication.sink.htablepool.capacity", 10));
        this.stopper = stopper;
        this.metrics = new ReplicationSinkMetrics();
    }

    public void replicateEntries(HLog.Entry[] entries) throws IOException {
        if (entries.length == 0) {
            return;
        }
        try {
            long totalReplicated = 0L;
            TreeMap puts = new TreeMap(Bytes.BYTES_COMPARATOR);
            for (HLog.Entry entry : entries) {
                WALEdit edit = entry.getEdit();
                List<KeyValue> kvs = edit.getKeyValues();
                if (kvs.get(0).isDelete()) {
                    Delete delete = new Delete(kvs.get(0).getRow(), kvs.get(0).getTimestamp(), null);
                    for (KeyValue kv : kvs) {
                        if (kv.isDeleteFamily()) {
                            delete.deleteFamily(kv.getFamily());
                            continue;
                        }
                        if (kv.isEmptyColumn()) continue;
                        delete.deleteColumn(kv.getFamily(), kv.getQualifier());
                    }
                    this.delete(entry.getKey().getTablename(), delete);
                } else {
                    byte[] table = entry.getKey().getTablename();
                    ArrayList<Put> tableList = (ArrayList<Put>)puts.get(table);
                    if (tableList == null) {
                        tableList = new ArrayList<Put>();
                        puts.put(table, tableList);
                    }
                    byte[] lastKey = kvs.get(0).getRow();
                    Put put = new Put(kvs.get(0).getRow(), kvs.get(0).getTimestamp());
                    for (KeyValue kv : kvs) {
                        if (!Bytes.equals(lastKey, kv.getRow())) {
                            tableList.add(put);
                            put = new Put(kv.getRow(), kv.getTimestamp());
                        }
                        put.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
                        lastKey = kv.getRow();
                    }
                    tableList.add(put);
                }
                ++totalReplicated;
            }
            for (byte[] table : puts.keySet()) {
                this.put(table, (List)puts.get(table));
            }
            this.metrics.setAgeOfLastAppliedOp(entries[entries.length - 1].getKey().getWriteTime());
            this.metrics.appliedBatchesRate.inc(1);
            LOG.info((Object)("Total replicated: " + totalReplicated));
        }
        catch (IOException ex) {
            LOG.error((Object)"Unable to accept edit because:", (Throwable)ex);
            throw ex;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void put(byte[] tableName, List<Put> puts) throws IOException {
        if (puts.isEmpty()) {
            return;
        }
        HTableInterface table = null;
        try {
            table = this.pool.getTable(tableName);
            table.put(puts);
            this.metrics.appliedOpsRate.inc(puts.size());
        }
        finally {
            if (table != null) {
                this.pool.putTable(table);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void delete(byte[] tableName, Delete delete) throws IOException {
        HTableInterface table = null;
        try {
            table = this.pool.getTable(tableName);
            table.delete(delete);
            this.metrics.appliedOpsRate.inc(1);
        }
        finally {
            if (table != null) {
                this.pool.putTable(table);
            }
        }
    }
}

