/*
 * Decompiled with CFR 0.152.
 */
package org.hypertable.examples.PerformanceTest;

import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
import java.util.Vector;
import java.util.logging.Logger;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.hypertable.AsyncComm.Comm;
import org.hypertable.AsyncComm.CommBuf;
import org.hypertable.AsyncComm.CommHeader;
import org.hypertable.AsyncComm.ConnectionHandlerFactory;
import org.hypertable.AsyncComm.DispatchHandler;
import org.hypertable.AsyncComm.Event;
import org.hypertable.AsyncComm.ReactorFactory;
import org.hypertable.Common.Error;
import org.hypertable.Common.HypertableException;
import org.hypertable.Common.ProgressMeterVertical;
import org.hypertable.Common.Usage;
import org.hypertable.examples.PerformanceTest.Message;
import org.hypertable.examples.PerformanceTest.MessageError;
import org.hypertable.examples.PerformanceTest.MessageFactory;
import org.hypertable.examples.PerformanceTest.MessageRegister;
import org.hypertable.examples.PerformanceTest.MessageSetup;
import org.hypertable.examples.PerformanceTest.MessageSummary;
import org.hypertable.examples.PerformanceTest.MessageTask;
import org.hypertable.examples.PerformanceTest.Result;
import org.hypertable.examples.PerformanceTest.Setup;
import org.hypertable.examples.PerformanceTest.Task;
import org.hypertable.thrift.ThriftClient;

public class Dispatcher {
    static final Logger log = Logger.getLogger("org.hypertable.examples");
    static String[] usage = new String[]{"", "usage: Dispatcher [OPTIONS] read|write|scan|incr <keySize> <valueSize> <totalDataSize>", "", "OPTIONS:", "  --help                  Display this help text and exit", "  --clients=<n>           Wait for <n> clients to connect", "  --cmf-file=<fname>      Load CMF data from <fname>", "  --driver=<s>            Which DB driver to use.  Valid values include:", "                          hypertable, hbase (default = hypertable)", "  --key-max=<n>           When generating random keys, modulo resulting key by <n>", "                          (default is keycount * 10)", "  --measure-latency       Measure request latency (default is false)", "  --output-dir=<dir>      Directory to write test report into (default is cwd)", "  --parallel=<n>          Run test in parallel mode by spawning <n> threads per client", "  --port=<n>              Specifies the listen port (default = 11256)", "  --timeout=<ms>          Wait for connection timeout in milliseconds", "                          (default = 5000)", "  --random                Generate random keys", "  --randomize-tasks       Randomize the task queue before starting test", "  --distribution-range=<n> Range of random number distribution.  Scaled to keyMax.", "  --scan-buffer-size=<n>  Size of scan result transfer buffer in number of bytes", "                          (default = 65K)", "  --submit-at-most=<n>    Submit no more than <n> keys, regardless of <keyCount>", "  --submit-exactly=<n>    Submit exactly <n> keys, replicating sequence generated by", "                          <keyCount> if necessary", "  --test-name=<name>      Test name used as filename prefix for summary output file", "  --value-data=<fname>    Absolute path to file, on Client machines, containing data", "                          to be randomly sampled for value content", "  --zipf                  For random order, use Zipfian distribution", "                          (default is uniform)", "", "This program is part of a performance test suite.  It acts as a", "dispatcher, handing out parcels of work to test clients.", "", null};
    static final int DEFAULT_PORT = 11256;

    public static String testTypeString(Setup.Order order, Setup.Type testType) {
        String orderStr = "";
        String typeStr = "";
        typeStr = testType == Setup.Type.READ ? "read" : (testType == Setup.Type.WRITE ? "write" : (testType == Setup.Type.SCAN ? "scan" : (testType == Setup.Type.INCR ? "incr" : "unknown")));
        if (testType != Setup.Type.SCAN) {
            orderStr = order == Setup.Order.SEQUENTIAL ? "sequential-" : (order == Setup.Order.RANDOM ? "random-" : "unknown-");
        }
        return orderStr + typeStr;
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        int port = 11256;
        long timeout = 15000L;
        int clients = 0;
        String outputDir = null;
        String testName = null;
        boolean testTypeSet = false;
        long submitExactly = -1L;
        long submitAtMost = -1L;
        long keysSubmitted = 0L;
        long totalDataSize = -1L;
        boolean randomizeTasks = false;
        String argString = "";
        Setup setup = new Setup();
        setup.tableName = "perftest";
        if (args.length == 1 && args[0].equals("--help")) {
            Usage.DumpAndExit((String[])usage);
        }
        for (int i = 0; i < args.length; ++i) {
            if (args[i].startsWith("--port=")) {
                port = Integer.parseInt(args[i].substring(7));
            } else if (args[i].startsWith("--clients=")) {
                clients = Integer.parseInt(args[i].substring(10));
            } else if (args[i].startsWith("--cmf-file=")) {
                setup.cmfFile = args[i].substring(11);
            } else if (args[i].startsWith("--driver=")) {
                setup.driver = args[i].substring(9);
            } else if (args[i].startsWith("--parallel=")) {
                setup.parallelism = Integer.parseInt(args[i].substring(11));
            } else if (args[i].startsWith("--key-max=")) {
                setup.keyMax = Long.parseLong(args[i].substring(10));
            } else if (args[i].startsWith("--submit-at-most=")) {
                submitAtMost = Long.parseLong(args[i].substring(17));
            } else if (args[i].startsWith("--submit-exactly=")) {
                submitExactly = Long.parseLong(args[i].substring(17));
            } else if (args[i].startsWith("--output-dir=")) {
                outputDir = args[i].substring(13);
            } else if (args[i].equals("--random")) {
                setup.order = Setup.Order.RANDOM;
            } else if (args[i].equals("--randomize-tasks")) {
                randomizeTasks = true;
            } else if (args[i].startsWith("--distribution-range=")) {
                setup.distributionRange = Long.parseLong(args[i].substring(21));
            } else if (args[i].equals("--timeout=")) {
                timeout = Long.parseLong(args[i].substring(10));
            } else if (args[i].startsWith("--scan-buffer-size=")) {
                setup.scanBufferSize = Integer.parseInt(args[i].substring(19));
            } else if (args[i].startsWith("--test-name=")) {
                testName = args[i].substring(12);
            } else if (args[i].startsWith("--value-data=")) {
                setup.valueData = args[i].substring(13);
            } else if (args[i].equals("--zipf")) {
                setup.distribution = Setup.Distribution.ZIPFIAN;
            } else if (!testTypeSet) {
                if (args[i].equals("read")) {
                    setup.type = Setup.Type.READ;
                } else if (args[i].equals("write")) {
                    setup.type = Setup.Type.WRITE;
                } else if (args[i].equals("scan")) {
                    setup.type = Setup.Type.SCAN;
                } else if (args[i].equals("incr")) {
                    setup.type = Setup.Type.INCR;
                } else {
                    Usage.DumpAndExit((String[])usage);
                }
                testTypeSet = true;
            } else if (setup.keySize == -1) {
                setup.keySize = Integer.parseInt(args[i]);
            } else if (setup.valueSize == -1) {
                setup.valueSize = Integer.parseInt(args[i]);
            } else if (totalDataSize == -1L) {
                totalDataSize = Long.parseLong(args[i]);
            } else {
                Usage.DumpAndExit((String[])usage);
            }
            argString = argString + args[i] + " ";
        }
        if (totalDataSize == -1L) {
            Usage.DumpAndExit((String[])usage);
        }
        setup.keyCount = totalDataSize / (long)(setup.keySize + setup.valueSize);
        if (submitExactly != -1L && submitAtMost != -1L) {
            System.out.println("ERROR:  Only one of --submit-exactly and --submit-at-most may be supplied");
            System.exit(1);
        }
        if (submitAtMost != -1L) {
            submitExactly = submitAtMost > setup.keyCount ? setup.keyCount : submitAtMost;
        } else if (submitExactly == -1L) {
            submitExactly = setup.keyCount;
        }
        if (setup.type == Setup.Type.INCR && setup.valueSize != -1) {
            System.out.println("WARNING: Value size ignored for INCR test type");
            setup.valueSize = -1;
        }
        try {
            HBaseAdmin admin;
            if (setup.driver.equals("hypertable")) {
                long namespace;
                ThriftClient client = ThriftClient.create((String)"localhost", (int)38080);
                if (!client.exists_table(namespace = client.open_namespace("/"), "perftest")) {
                    client.close_namespace(namespace);
                    System.out.println("Table 'perftest' does not exist, exiting...");
                    System.exit(1);
                }
                client.close_namespace(namespace);
            } else if (setup.driver.equals("hbase") && !(admin = new HBaseAdmin(HBaseConfiguration.create())).tableExists("perftest")) {
                System.out.println("Table 'perftest' does not exist, exiting...");
                System.exit(1);
            }
        }
        catch (Exception e) {
            System.out.println("Problem creating table - " + e.getMessage());
            System.exit(-1);
        }
        try {
            Event event;
            long end;
            long start;
            long rangesize;
            ReactorFactory.Initialize((short)2);
            Vector<Task> taskVector = new Vector<Task>();
            LinkedList<MessageTask> messageQueue = new LinkedList<MessageTask>();
            RequestHandler requestHandler = new RequestHandler(timeout);
            HandlerFactory handlerFactory = new HandlerFactory(requestHandler);
            HashMap<String, Result> resultMap = new HashMap<String, Result>();
            Comm comm = new Comm(0);
            comm.Listen(port, (ConnectionHandlerFactory)handlerFactory, (DispatchHandler)requestHandler);
            int connections = requestHandler.waitForConnections(clients);
            if (clients > 0 && connections != clients) {
                log.info("Expected " + clients + " clients, but only got " + connections);
                System.exit(1);
            }
            if (connections == 0) {
                log.info("Timed out after waiting for connections, exiting...");
                System.exit(1);
            }
            int pendingResults = connections;
            int progressReadySkip = connections;
            if (setup.keyMax == -1L) {
                setup.keyMax = setup.keyCount * 10L;
            }
            long nranges = 10 * connections;
            if (setup.type == Setup.Type.SCAN) {
                rangesize = setup.keyMax / nranges;
                start = 0L;
                end = rangesize;
                while (start < setup.keyMax) {
                    taskVector.add(new Task(start, end));
                    start = end;
                    end += rangesize;
                }
            } else {
                rangesize = submitExactly / nranges;
                while (keysSubmitted < submitExactly) {
                    start = 0L;
                    end = rangesize;
                    while (start < setup.keyCount && keysSubmitted < submitExactly) {
                        keysSubmitted += end - start;
                        taskVector.add(new Task(start, end));
                        start = end;
                        end += rangesize;
                    }
                }
            }
            if (randomizeTasks) {
                Random random = new Random(System.currentTimeMillis());
                for (int i = 0; i < taskVector.size(); ++i) {
                    int randi = random.nextInt(taskVector.size());
                    Task tempTask = (Task)taskVector.get(randi);
                    taskVector.set(randi, (Task)taskVector.get(0));
                    taskVector.set(0, tempTask);
                }
            }
            for (Task t : taskVector) {
                messageQueue.offer(new MessageTask(t));
            }
            MessageSetup messageSetup = new MessageSetup(setup);
            System.out.println();
            ProgressMeterVertical progress = new ProgressMeterVertical((long)messageQueue.size());
            long startTime = System.currentTimeMillis();
            while ((event = requestHandler.GetRequest()) != null) {
                CommBuf cbuf;
                Message message = MessageFactory.createMessage(event.payload);
                CommHeader header = new CommHeader();
                header.initialize_from_request_header(event.header);
                if (message.type() == Message.Type.REGISTER) {
                    MessageRegister messageRegister = (MessageRegister)message;
                    cbuf = messageSetup.createCommBuf(header);
                    int error = comm.SendResponse(event.addr, cbuf);
                    if (error != 0) {
                        throw new HypertableException(error);
                    }
                    System.out.println("Client: " + messageRegister.getClientName());
                    if (resultMap.containsKey(messageRegister.getClientName())) {
                        throw new HypertableException(-1, "Client " + messageRegister.getHostName() + " already registered");
                    }
                    resultMap.put(messageRegister.getClientName(), null);
                    continue;
                }
                if (message.type() == Message.Type.READY) {
                    int error;
                    if (progressReadySkip == 0) {
                        progress.add(1L);
                    } else {
                        --progressReadySkip;
                    }
                    if (!messageQueue.isEmpty()) {
                        message = (Message)messageQueue.remove();
                        cbuf = message.createCommBuf(header);
                    } else {
                        cbuf = new Message(Message.Type.FINISHED).createCommBuf(header);
                    }
                    if ((error = comm.SendResponse(event.addr, cbuf)) == 0) continue;
                    throw new HypertableException(error);
                }
                if (message.type() == Message.Type.SUMMARY) {
                    Result result = (Result)resultMap.remove(((MessageSummary)message).getClientName());
                    if (result != null) {
                        throw new HypertableException(-1, "Received test results from " + ((MessageSummary)message).getClientName() + " twice");
                    }
                    resultMap.put(((MessageSummary)message).getClientName(), ((MessageSummary)message).getResult());
                    if (--pendingResults != 0) continue;
                    break;
                }
                if (message.type() == Message.Type.ERROR) {
                    System.out.println(event.addr + ": " + ((MessageError)message).getMessage());
                    System.exit(1);
                    continue;
                }
                System.out.println("UNKNOWN");
            }
            progress.finished();
            long stopTime = System.currentTimeMillis();
            long itemCount = 0L;
            long requestCount = 0L;
            long elapsedMillis = 0L;
            long bytesReturned = 0L;
            long cumulativeLatency = 0L;
            for (Map.Entry pairs : resultMap.entrySet()) {
                Result result = (Result)pairs.getValue();
                if (setup.type == Setup.Type.READ || setup.type == Setup.Type.SCAN) {
                    itemCount += result.itemsReturned;
                    requestCount += result.requestCount;
                    bytesReturned += result.valueBytesReturned;
                    cumulativeLatency += result.cumulativeLatency;
                } else {
                    itemCount += result.itemsSubmitted;
                }
                elapsedMillis += result.elapsedMillis;
            }
            ArrayList<String> summary = new ArrayList<String>();
            Object summaryLine = null;
            System.out.println();
            summary.add("Test: " + Dispatcher.testTypeString(setup.order, setup.type));
            if (setup.type == Setup.Type.READ) {
                summary.add("Distribution: " + (setup.distribution == Setup.Distribution.ZIPFIAN ? "zipfian" : "uniform"));
            }
            summary.add("Driver: " + setup.driver);
            summary.add("Args: " + argString);
            summary.add("Key Count: " + setup.keyCount);
            summary.add("Key Size: " + setup.keySize);
            summary.add("Value size: " + setup.valueSize);
            summary.add("Keys Submitted: " + keysSubmitted);
            if (setup.type == Setup.Type.READ || setup.type == Setup.Type.SCAN) {
                summary.add("Items Returned: " + itemCount);
                summary.add("Value Bytes Returned: " + bytesReturned);
            }
            summary.add("Clients: " + connections);
            summary.add("Start time: " + new Date(startTime).toString());
            summary.add("Finish time: " + new Date(stopTime).toString());
            summary.add("Wall time: " + (double)(stopTime - startTime) / 1000.0 + " s");
            summary.add("Test time: " + elapsedMillis / 1000L + " s");
            bytesReturned = setup.type == Setup.Type.READ || setup.type == Setup.Type.SCAN ? (bytesReturned += itemCount * (long)setup.keySize) : itemCount * (long)(setup.keySize + setup.valueSize);
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            PrintStream ps = new PrintStream(baos);
            if (setup.type == Setup.Type.INCR) {
                ps.format("Client Throughput: %.2f incr/s", (double)keysSubmitted / (double)(elapsedMillis / 1000L));
            } else {
                ps.format("Client Throughput: %.2f queries/s", (double)keysSubmitted / (double)(elapsedMillis / 1000L));
                summary.add(baos.toString());
                baos.reset();
                ps.format("Client Throughput: %.2f bytes/s", (double)bytesReturned / (double)(elapsedMillis / 1000L));
            }
            summary.add(baos.toString());
            baos.reset();
            if (setup.type == Setup.Type.INCR) {
                ps.format("Aggregate Throughput: %.2f incr/s", (double)keysSubmitted / (double)(elapsedMillis / 1000L) * (double)connections);
            } else {
                ps.format("Aggregate Throughput: %.2f queries/s", (double)keysSubmitted / (double)(elapsedMillis / 1000L) * (double)connections);
                summary.add(baos.toString());
                baos.reset();
                ps.format("Aggregate Throughput: %.2f bytes/s", (double)bytesReturned / (double)(elapsedMillis / 1000L) * (double)connections);
            }
            summary.add(baos.toString());
            if (setup.type == Setup.Type.READ) {
                baos.reset();
                ps.format("Average Latency: %.3f milliseconds", (double)cumulativeLatency / (double)requestCount);
                summary.add(baos.toString());
            }
            for (String s : summary) {
                System.out.println(s);
            }
            String summaryFileName = null;
            if (outputDir != null) {
                File out;
                if (outputDir.endsWith("/")) {
                    outputDir = outputDir.substring(0, outputDir.length() - 1);
                }
                if (!(out = new File(outputDir)).isDirectory() && !out.mkdirs()) {
                    System.out.println("Error - unable to create output directory '" + outputDir + "'");
                    System.exit(-1);
                }
                summaryFileName = outputDir + "/";
            } else {
                summaryFileName = "";
            }
            if (testName != null) {
                summaryFileName = summaryFileName + testName + "-";
            }
            summaryFileName = summaryFileName + setup.driver + "-" + Dispatcher.testTypeString(setup.order, setup.type) + "-";
            if (setup.type == Setup.Type.READ && setup.order == Setup.Order.RANDOM) {
                summaryFileName = summaryFileName + (setup.distribution == Setup.Distribution.ZIPFIAN ? "zipfian" : "uniform") + "-";
            }
            summaryFileName = summaryFileName + setup.keyCount + "-" + setup.keySize + "-" + setup.valueSize + "-" + clients + "clients.txt";
            BufferedWriter writer = new BufferedWriter(new FileWriter(summaryFileName));
            for (String s : summary) {
                writer.write(s);
                writer.newLine();
            }
            writer.flush();
            writer.close();
            ReactorFactory.Shutdown();
        }
        catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    private static class HandlerFactory
    implements ConnectionHandlerFactory {
        private DispatchHandler mDispatchHandler;

        public HandlerFactory(DispatchHandler cb) {
            this.mDispatchHandler = cb;
        }

        public DispatchHandler newInstance() {
            return this.mDispatchHandler;
        }
    }

    private static class RequestHandler
    implements DispatchHandler {
        private LinkedList<Event> mQueue = new LinkedList();
        private int mConnections = 0;
        private long mStartTime = 0L;
        private long mTimeout;

        public RequestHandler(long timeout) {
            this.mTimeout = timeout;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void handle(Event event) {
            if (event.type == Event.Type.CONNECTION_ESTABLISHED) {
                RequestHandler requestHandler = this;
                synchronized (requestHandler) {
                    ++this.mConnections;
                    this.mStartTime = System.currentTimeMillis();
                    this.notify();
                }
            }
            if (event.type == Event.Type.DISCONNECT) {
                RequestHandler requestHandler = this;
                synchronized (requestHandler) {
                    --this.mConnections;
                    this.mStartTime = System.currentTimeMillis();
                    this.notify();
                }
            }
            if (event.type == Event.Type.ERROR) {
                log.info("Error : " + Error.GetText((int)event.error));
                System.exit(1);
            } else if (event.type == Event.Type.MESSAGE) {
                RequestHandler requestHandler = this;
                synchronized (requestHandler) {
                    Event newEvent = new Event(event);
                    this.mQueue.offer(newEvent);
                    this.notify();
                }
            }
        }

        public synchronized Event GetRequest() throws InterruptedException {
            while (this.mQueue.isEmpty()) {
                this.wait();
            }
            return this.mQueue.remove();
        }

        public synchronized int waitForConnections(int clients) throws HypertableException, InterruptedException {
            if (clients > 0 && this.mConnections == clients) {
                return this.mConnections;
            }
            long now = this.mStartTime = System.currentTimeMillis();
            while (now - this.mStartTime < this.mTimeout) {
                this.wait(this.mTimeout - (now - this.mStartTime));
                if (clients > 0 && this.mConnections == clients) break;
                now = System.currentTimeMillis();
            }
            if (clients != 0 && clients < this.mConnections) {
                throw new HypertableException(-1, "Expected " + clients + " clients, only got " + this.mConnections);
            }
            return this.mConnections;
        }
    }
}

