changeset 558:7c805a77d33d

reporter - transport
author Devel 1
date Thu, 21 Sep 2017 11:56:09 +0200
parents f65e374eb0f8
children 4b26fdc22ef2
files stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroCustomBenchmark.java stress-tester-reporter/src/main/java/com/passus/st/reporter/ReporterClient.java stress-tester-reporter/src/main/java/com/passus/st/reporter/ReporterImpl.java stress-tester-reporter/src/main/java/com/passus/st/reporter/server/AvroRpcReporterClient.java stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ReporterClient.java stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ReporterImpl.java stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ServerMain.java stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterClient.java stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ServerMain.java stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/SocketReporterClient.java stress-tester-reporter/src/test/java/com/passus/st/reporter/server/Main.java stress-tester-reporter/src/test/java/com/passus/st/reporter/server/Test.java stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestCustom.java stress-tester/src/main/java/com/passus/st/Main.java stress-tester/src/main/java/com/passus/st/PcapReporter.java stress-tester/src/main/java/com/passus/st/client/http/HttpReporterClientListener.java stress-tester/src/main/java/com/passus/st/client/http/HttpReporterMetricHandler.java stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java
diffstat 18 files changed, 683 insertions(+), 665 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroCustomBenchmark.java	Wed Sep 20 23:11:11 2017 +0200
+++ b/stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroCustomBenchmark.java	Thu Sep 21 11:56:09 2017 +0200
@@ -1,7 +1,7 @@
 package com.passus.st.avro;
 
 import com.passus.st.reporter.trx.ServerMain;
-import com.passus.st.reporter.trx.ReporterClient;
+import com.passus.st.reporter.trx.SocketReporterClient;
 import com.passus.st.reporter.protocol.Reporter;
 import com.passus.utils.AllocationUtils;
 import java.io.IOException;
@@ -36,7 +36,7 @@
 public class AvroCustomBenchmark extends AbstractAvroBenchmark {
 
     private final ServerMain server = new ServerMain("localhost", 11111, 500, new DummyReporter());
-    private final ReporterClient client = new ReporterClient(serverAddress, new SynchronousQueue<>(), false);
+    private final SocketReporterClient client = new SocketReporterClient(serverAddress, new SynchronousQueue<>(), false);
     private final TestMetric metric = new TestMetric();
 
     @Setup
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/ReporterClient.java	Thu Sep 21 11:56:09 2017 +0200
@@ -0,0 +1,14 @@
+package com.passus.st.reporter;
+
+import com.passus.commons.service.Service;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public interface ReporterClient extends Service {
+
+    public boolean send(Object object);
+
+    void waitForEmptyQueue() throws InterruptedException;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/ReporterImpl.java	Thu Sep 21 11:56:09 2017 +0200
@@ -0,0 +1,207 @@
+package com.passus.st.reporter;
+
+import com.passus.st.reporter.protocol.MetricRecord;
+import com.passus.st.reporter.protocol.MetricsCollectionRecord;
+import com.passus.st.reporter.protocol.Reporter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.util.Utf8;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class ReporterImpl implements Reporter {
+
+    private final PrintWriter reqFile;
+    private final PrintWriter respFile;
+    private final PrintWriter reqRespFile;
+    private final PrintWriter emitterFile;
+    private final boolean merge;
+    private boolean verbose;
+
+    public ReporterImpl() throws IOException {
+        this(false);
+    }
+
+    public ReporterImpl(boolean merge) throws IOException {
+        reqFile = new PrintWriter("requests.csv", "UTF-8");
+        respFile = new PrintWriter("responses.csv", "UTF-8");
+        reqRespFile = merge ? new PrintWriter("all.csv", "UTF-8") : null;
+        emitterFile = new PrintWriter("emitter.csv", "UTF-8");
+        this.merge = merge;
+    }
+
+    public void close() {
+        reqFile.close();
+        respFile.close();
+        if (merge) {
+            reqRespFile.close();
+        }
+        emitterFile.close();
+    }
+
+    public void setVerbose(boolean verbose) {
+        this.verbose = verbose;
+    }
+
+    private static void addValue(StringBuilder builder, Object value) {
+        if (value != null) {
+            builder.append('"');
+            builder.append(value.toString());
+            builder.append('"');
+        }
+        builder.append(';');
+    }
+
+    private static String getValue(Map<CharSequence, ? extends Object> map, String header) {
+        Object result = null;
+        if (map != null) {
+            result = map.get(new Utf8(header));
+        }
+        return result == null ? null : result.toString();
+    }
+
+    @Override
+    public CharSequence handleMetric(MetricRecord metric) throws AvroRemoteException {
+        if (verbose) {
+            System.out.println(metric);
+        }
+
+        String code = metric.getCode().toString();
+
+        if (code.equalsIgnoreCase("httpRequestResponse")) {
+            Map<CharSequence, Object> fields = metric.getFields();
+            StringBuilder builder = new StringBuilder();
+
+            addValue(builder, getValue(fields, "reqId"));
+            addValue(builder, getValue(fields, "method"));
+            addValue(builder, getValue(fields, "reqVersion"));
+            addValue(builder, getValue(fields, "url"));
+            addValue(builder, getValue(fields, "reqStart"));
+            addValue(builder, getValue(fields, "reqStop"));
+            addValue(builder, getValue(fields, "serverIp"));
+            addValue(builder, getValue(fields, "serverPort"));
+            addValue(builder, getValue(fields, "clientIp"));
+            addValue(builder, getValue(fields, "clientPort"));
+            addValue(builder, getValue(fields, "reqHdrSize"));
+            addValue(builder, getValue(fields, "reqCntSize"));
+            Map<CharSequence, CharSequence> reqHdrs = (Map<CharSequence, CharSequence>) fields.get(new Utf8("reqHdrs"));
+            addValue(builder, getValue(reqHdrs, "User-Agent"));
+            Map<CharSequence, CharSequence> misc = (Map<CharSequence, CharSequence>) fields.get(new Utf8("misc"));
+            addValue(builder, getValue(misc, "sessionId"));
+            addValue(builder, getValue(misc, "username"));
+            reqFile.println(builder.toString());
+            reqFile.flush();
+            builder.setLength(0);
+
+            addValue(builder, getValue(fields, "reqId"));
+            addValue(builder, getValue(fields, "reason"));
+            addValue(builder, getValue(fields, "code"));
+            addValue(builder, getValue(fields, "respStart"));
+            addValue(builder, getValue(fields, "respStop"));
+            Map<CharSequence, CharSequence> respHdrs = (Map<CharSequence, CharSequence>) fields.get(new Utf8("respHdrs"));
+            addValue(builder, getValue(respHdrs, "Content-Type"));
+            addValue(builder, getValue(fields, "respHdrSize"));
+            addValue(builder, getValue(fields, "respCntSize"));
+            respFile.println(builder.toString());
+            respFile.flush();
+            builder.setLength(0);
+
+            // unused keys:
+            // origClientIp origClientPort origServerIp origServerPort
+            // respVersion
+            long reqStart = Long.parseLong(getValue(fields, "reqStart"));
+            long reqStop = Long.parseLong(getValue(fields, "reqStop"));
+            long respStart = Long.parseLong(getValue(fields, "respStart"));
+            long respStop = Long.parseLong(getValue(fields, "respStop"));
+            String url = getValue(fields, "url");
+            printDuration(reqStart, reqStop, "req", url);
+            printDuration(reqStop, respStart, "r2r", url);
+            printDuration(respStart, respStop, "res", url);
+
+            if (merge) {
+                addValue(builder, getValue(fields, "reqId"));
+                addValue(builder, getValue(fields, "clientPort"));
+                addValue(builder, getValue(fields, "reqStart"));
+                addValue(builder, getValue(fields, "reqStop"));
+                addValue(builder, getValue(fields, "respStart"));
+                addValue(builder, getValue(fields, "respStop"));
+                addValue(builder, String.valueOf(reqStop - reqStart));
+                addValue(builder, String.valueOf(respStart - reqStop));
+                addValue(builder, String.valueOf(respStop - respStart));
+                addValue(builder, getValue(fields, "reqHdrSize"));
+                addValue(builder, getValue(fields, "reqCntSize"));
+                addValue(builder, getValue(fields, "respHdrSize"));
+                addValue(builder, getValue(fields, "respCntSize"));
+                addValue(builder, getValue(fields, "code"));
+                addValue(builder, getValue(fields, "method"));
+                addValue(builder, getValue(fields, "url"));
+                builder.setLength(builder.length() - 1);
+                reqRespFile.println(builder.toString());
+                reqRespFile.flush();
+                builder.setLength(0);
+            }
+        }
+
+        return "OK";
+    }
+
+    private static void printDuration(long start, long end, String name, String url) {
+        long diff = end - start;
+        if (diff > 600) {
+            System.out.println("long " + name + ": " + diff + " " + url);
+        }
+    }
+
+    @Override
+    public CharSequence handleMetricsCollection(MetricsCollectionRecord collection) throws AvroRemoteException {
+        if (verbose) {
+            System.out.println(collection);
+        }
+        StringBuilder builder = new StringBuilder();
+
+        addValue(builder, collection.getStartTimestamp());
+        addValue(builder, collection.getEndTimestamp());
+
+        MetricRecord emitterMetric = findMetric(collection, "emitter");
+        if (emitterMetric != null) {
+            Map<CharSequence, Object> emitterFields = emitterMetric.getFields();
+            addValue(builder, emitterFields.get(new Utf8("receivedBytes")));
+            addValue(builder, emitterFields.get(new Utf8("sentBytes")));
+            addValue(builder, emitterFields.get(new Utf8("establishedConnections")));
+            addValue(builder, emitterFields.get(new Utf8("closedConnections")));
+            addValue(builder, emitterFields.get(new Utf8("bindErrors")));
+
+        } else {
+            builder.append(";;;;;");
+        }
+
+        emitterMetric = findMetric(collection, "pcapSource");
+        if (emitterMetric != null) {
+            Map<CharSequence, Object> emitterFields = emitterMetric.getFields();
+            addValue(builder, emitterFields.get(new Utf8("frames")));
+            addValue(builder, emitterFields.get(new Utf8("tcpPackets")));
+        } else {
+            builder.append(";;");
+        }
+
+        emitterFile.println(builder.toString());
+        emitterFile.flush();
+
+        return "OK";
+    }
+
+    private static MetricRecord findMetric(MetricsCollectionRecord collection, String name) {
+        List<MetricRecord> metrics = collection.getMetrics();
+        for (MetricRecord metric : metrics) {
+            if (metric.getCode().toString().equals(name)) {
+                return metric;
+            }
+        }
+        return null;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/AvroRpcReporterClient.java	Thu Sep 21 11:56:09 2017 +0200
@@ -0,0 +1,185 @@
+package com.passus.st.reporter.server;
+
+import com.passus.commons.metric.Metric;
+import com.passus.commons.metric.MetricsCollection;
+import com.passus.commons.utils.SimpleThreadFactory;
+import com.passus.st.reporter.MetricConverter;
+import com.passus.st.reporter.ReporterClient;
+import com.passus.st.reporter.protocol.MetricRecord;
+import com.passus.st.reporter.protocol.MetricsCollectionRecord;
+import com.passus.st.reporter.protocol.Reporter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class AvroRpcReporterClient implements ReporterClient {
+
+    private static final Logger LOGGER = LogManager.getLogger(AvroRpcReporterClient.class);
+
+    private final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(4096);
+    private final MetricConverter converter = new MetricConverter();
+    private final InetSocketAddress serverAddress;
+    private volatile boolean working;
+    private SenderThread sender;
+
+    public AvroRpcReporterClient(InetSocketAddress serverAddress) {
+        this.serverAddress = serverAddress;
+    }
+
+    @Override
+    public boolean send(Object object) {
+        if (!queue.offer(object)) {
+            System.out.println("reporter queue full");
+            LOGGER.debug("Could not enqueue message.");
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean isStarted() {
+        return working;
+    }
+
+    @Override
+    public void start() {
+        if (working) {
+            return;
+        }
+
+        working = true;
+        sender = new SenderThread();
+        try {
+            sender.connect();
+        } catch (IOException ignore) {
+        }
+        sender.start();
+    }
+
+    @Override
+    public void stop() {
+        if (working) {
+            working = false;
+            sender.interrupt();
+            try {
+                sender.join(500);
+            } catch (Exception ex) {
+            }
+            sender = null;
+        }
+    }
+
+    @Override
+    public void waitForEmptyQueue() throws InterruptedException {
+        while (queue.isEmpty() == false) {
+            Thread.sleep(100);
+        }
+    }
+
+    private static Executor pool(String name, int corePoolSize) {
+        SimpleThreadFactory factory = new SimpleThreadFactory("Avro NettyTransceiver" + name);
+        return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
+                new SynchronousQueue<>(), factory);
+    }
+
+    private class SenderThread extends Thread {
+
+        private static final long RETRY_PERIOD = 2000;
+
+        private NettyTransceiver client;
+        private Reporter proxy;
+        private long lastFailure;
+
+        private void connect() throws IOException {
+            if (lastFailure + RETRY_PERIOD > System.currentTimeMillis()) {
+                return;
+            }
+
+            NioClientSocketChannelFactory factory = new NioClientSocketChannelFactory(
+                    pool(" Boss", 1), pool(" I/O Worker", 4));
+            try {
+                client = new NettyTransceiver(serverAddress, factory, 10_000L);
+                proxy = (Reporter) SpecificRequestor.getClient(Reporter.class, client);
+            } catch (Exception ex) {
+                lastFailure = System.currentTimeMillis();
+                disconnect();
+                factory.releaseExternalResources();
+                throw ex;
+            }
+        }
+
+        private void disconnect() {
+            if (client != null) {
+                client.close();
+            }
+            proxy = null;
+            client = null;
+        }
+
+        private boolean isConnected() {
+            return proxy != null;
+        }
+
+        @Override
+        public void run() {
+            while (working) {
+                try {
+                    // TODO: refactor
+                    Object metric = queue.take();
+                    Object record;
+                    if (metric instanceof Metric) {
+                        record = converter.convert((Metric) metric, false);
+                    } else if (metric instanceof MetricsCollection) {
+                        record = converter.convert((MetricsCollection) metric);
+                    } else {
+                        continue;
+                    }
+
+                    if (!isConnected()) {
+                        connect();
+                    }
+                    if (isConnected()) {
+                        CharSequence result;
+                        if (record instanceof MetricRecord) {
+                            result = proxy.handleMetric((MetricRecord) record);
+                        } else if (record instanceof MetricsCollectionRecord) {
+                            result = proxy.handleMetricsCollection((MetricsCollectionRecord) record);
+                        } else {
+                            System.out.println(metric.getClass());
+                            continue;
+                        }
+                        LOGGER.trace("result: {}", result);
+                    }
+
+                } catch (InterruptedException ex) {
+                    LOGGER.trace("Queue.take() was interrupted.");
+                } catch (AvroRemoteException ex) {
+                    LOGGER.warn("Could not send.", ex);
+                } catch (IOException ex) {
+                    LOGGER.warn("Could not connect.", ex);
+                } catch (Exception ex) {
+                    LOGGER.error(ex);
+                }
+            }
+
+            disconnect();
+            LOGGER.debug("Sender stopped.");
+        }
+
+    }
+}
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ReporterClient.java	Wed Sep 20 23:11:11 2017 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,179 +0,0 @@
-package com.passus.st.reporter.server;
-
-import com.passus.commons.metric.Metric;
-import com.passus.commons.metric.MetricsCollection;
-import com.passus.commons.service.Service;
-import com.passus.commons.utils.SimpleThreadFactory;
-import com.passus.st.reporter.MetricConverter;
-import com.passus.st.reporter.protocol.MetricRecord;
-import com.passus.st.reporter.protocol.MetricsCollectionRecord;
-import com.passus.st.reporter.protocol.Reporter;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyTransceiver;
-import org.apache.avro.ipc.specific.SpecificRequestor;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-
-/**
- *
- * @author mikolaj.podbielski
- */
-public class ReporterClient implements Service {
-
-    private static final Logger LOGGER = LogManager.getLogger(ReporterClient.class);
-
-    private final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(4096);
-    private final MetricConverter converter = new MetricConverter();
-    private final InetSocketAddress serverAddress;
-    private volatile boolean working;
-    private SenderThread sender;
-
-    public ReporterClient(InetSocketAddress serverAddress) {
-        this.serverAddress = serverAddress;
-    }
-
-    public void send(Object object) {
-        if (!queue.offer(object)) {
-            LOGGER.debug("Could not enqueue message.");
-        }
-    }
-
-    @Override
-    public boolean isStarted() {
-        return working;
-    }
-
-    @Override
-    public void start() {
-        if (working) {
-            return;
-        }
-
-        working = true;
-        sender = new SenderThread();
-        try {
-            sender.connect();
-        } catch (IOException ignore) {
-        }
-        sender.start();
-    }
-
-    @Override
-    public void stop() {
-        if (working) {
-            working = false;
-            sender.interrupt();
-            try {
-                sender.join(500);
-            } catch (Exception ex) {
-            }
-            sender = null;
-        }
-    }
-
-    public void waitForEmptyQueue() throws InterruptedException {
-        while (queue.isEmpty() == false) {
-            Thread.sleep(100);
-        }
-    }
-
-    private static Executor pool(String name, int corePoolSize) {
-        SimpleThreadFactory factory = new SimpleThreadFactory("Avro NettyTransceiver" + name);
-        return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
-                                      new SynchronousQueue<>(), factory);
-    }
-
-    private class SenderThread extends Thread {
-
-        private static final long RETRY_PERIOD = 2000;
-
-        private NettyTransceiver client;
-        private Reporter proxy;
-        private long lastFailure;
-
-        private void connect() throws IOException {
-            if (lastFailure + RETRY_PERIOD > System.currentTimeMillis()) {
-                return;
-            }
-
-            NioClientSocketChannelFactory factory = new NioClientSocketChannelFactory(
-                    pool(" Boss", 1), pool(" I/O Worker", 4));
-            try {
-                client = new NettyTransceiver(serverAddress, factory, 10_000L);
-                proxy = (Reporter) SpecificRequestor.getClient(Reporter.class, client);
-            } catch (Exception ex) {
-                lastFailure = System.currentTimeMillis();
-                disconnect();
-                factory.releaseExternalResources();
-                throw ex;
-            }
-        }
-
-        private void disconnect() {
-            if (client != null) {
-                client.close();
-            }
-            proxy = null;
-            client = null;
-        }
-
-        private boolean isConnected() {
-            return proxy != null;
-        }
-
-        @Override
-        public void run() {
-            while (working) {
-                try {
-                    // TODO: refactor
-                    Object metric = queue.take();
-                    Object record;
-                    if (metric instanceof Metric) {
-                        record = converter.convert((Metric) metric, false);
-                    } else if (metric instanceof MetricsCollection) {
-                        record = converter.convert((MetricsCollection) metric);
-                    } else {
-                        continue;
-                    }
-
-                    if (!isConnected()) {
-                        connect();
-                    }
-                    if (isConnected()) {
-                        CharSequence result;
-                        if (record instanceof MetricRecord) {
-                            result = proxy.handleMetric((MetricRecord) record);
-                        } else if (record instanceof MetricsCollectionRecord) {
-                            result = proxy.handleMetricsCollection((MetricsCollectionRecord) record);
-                        } else {
-                            continue;
-                        }
-                        LOGGER.trace("result: {}", result);
-                    }
-
-                } catch (InterruptedException ex) {
-                    LOGGER.trace("Queue.take() was interrupted.");
-                } catch (AvroRemoteException ex) {
-                    LOGGER.warn("Could not send.", ex);
-                } catch (IOException ex) {
-                    LOGGER.warn("Could not connect.", ex);
-                } catch (Exception ex) {
-                    LOGGER.error(ex);
-                }
-            }
-
-            disconnect();
-            LOGGER.debug("Sender stopped.");
-        }
-
-    }
-}
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ReporterImpl.java	Wed Sep 20 23:11:11 2017 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,207 +0,0 @@
-package com.passus.st.reporter.server;
-
-import com.passus.st.reporter.protocol.MetricRecord;
-import com.passus.st.reporter.protocol.MetricsCollectionRecord;
-import com.passus.st.reporter.protocol.Reporter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.List;
-import java.util.Map;
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.util.Utf8;
-
-/**
- *
- * @author mikolaj.podbielski
- */
-public class ReporterImpl implements Reporter {
-
-    private final PrintWriter reqFile;
-    private final PrintWriter respFile;
-    private final PrintWriter reqRespFile;
-    private final PrintWriter emitterFile;
-    private final boolean merge;
-    private boolean verbose;
-
-    public ReporterImpl() throws IOException {
-        this(false);
-    }
-
-    public ReporterImpl(boolean merge) throws IOException {
-        reqFile = new PrintWriter("requests.csv", "UTF-8");
-        respFile = new PrintWriter("responses.csv", "UTF-8");
-        reqRespFile = merge ? new PrintWriter("all.csv", "UTF-8") : null;
-        emitterFile = new PrintWriter("emitter.csv", "UTF-8");
-        this.merge = merge;
-    }
-
-    public void close() {
-        reqFile.close();
-        respFile.close();
-        if (merge) {
-            reqRespFile.close();
-        }
-        emitterFile.close();
-    }
-
-    public void setVerbose(boolean verbose) {
-        this.verbose = verbose;
-    }
-
-    private static void addValue(StringBuilder builder, Object value) {
-        if (value != null) {
-            builder.append('"');
-            builder.append(value.toString());
-            builder.append('"');
-        }
-        builder.append(';');
-    }
-
-    private static String getValue(Map<CharSequence, ? extends Object> map, String header) {
-        Object result = null;
-        if (map != null) {
-            result = map.get(new Utf8(header));
-        }
-        return result == null ? null : result.toString();
-    }
-
-    @Override
-    public CharSequence handleMetric(MetricRecord metric) throws AvroRemoteException {
-        if (verbose) {
-            System.out.println(metric);
-        }
-
-        String code = metric.getCode().toString();
-
-        if (code.equalsIgnoreCase("httpRequestResponse")) {
-            Map<CharSequence, Object> fields = metric.getFields();
-            StringBuilder builder = new StringBuilder();
-
-            addValue(builder, getValue(fields, "reqId"));
-            addValue(builder, getValue(fields, "method"));
-            addValue(builder, getValue(fields, "reqVersion"));
-            addValue(builder, getValue(fields, "url"));
-            addValue(builder, getValue(fields, "reqStart"));
-            addValue(builder, getValue(fields, "reqStop"));
-            addValue(builder, getValue(fields, "serverIp"));
-            addValue(builder, getValue(fields, "serverPort"));
-            addValue(builder, getValue(fields, "clientIp"));
-            addValue(builder, getValue(fields, "clientPort"));
-            addValue(builder, getValue(fields, "reqHdrSize"));
-            addValue(builder, getValue(fields, "reqCntSize"));
-            Map<CharSequence, CharSequence> reqHdrs = (Map<CharSequence, CharSequence>) fields.get(new Utf8("reqHdrs"));
-            addValue(builder, getValue(reqHdrs, "User-Agent"));
-            Map<CharSequence, CharSequence> misc = (Map<CharSequence, CharSequence>) fields.get(new Utf8("misc"));
-            addValue(builder, getValue(misc, "sessionId"));
-            addValue(builder, getValue(misc, "username"));
-            reqFile.println(builder.toString());
-            reqFile.flush();
-            builder.setLength(0);
-
-            addValue(builder, getValue(fields, "reqId"));
-            addValue(builder, getValue(fields, "reason"));
-            addValue(builder, getValue(fields, "code"));
-            addValue(builder, getValue(fields, "respStart"));
-            addValue(builder, getValue(fields, "respStop"));
-            Map<CharSequence, CharSequence> respHdrs = (Map<CharSequence, CharSequence>) fields.get(new Utf8("respHdrs"));
-            addValue(builder, getValue(respHdrs, "Content-Type"));
-            addValue(builder, getValue(fields, "respHdrSize"));
-            addValue(builder, getValue(fields, "respCntSize"));
-            respFile.println(builder.toString());
-            respFile.flush();
-            builder.setLength(0);
-
-            // unused keys:
-            // origClientIp origClientPort origServerIp origServerPort
-            // respVersion
-            long reqStart = Long.parseLong(getValue(fields, "reqStart"));
-            long reqStop = Long.parseLong(getValue(fields, "reqStop"));
-            long respStart = Long.parseLong(getValue(fields, "respStart"));
-            long respStop = Long.parseLong(getValue(fields, "respStop"));
-            String url = getValue(fields, "url");
-            printDuration(reqStart, reqStop, "req", url);
-            printDuration(reqStop, respStart, "r2r", url);
-            printDuration(respStart, respStop, "res", url);
-
-            if (merge) {
-                addValue(builder, getValue(fields, "reqId"));
-                addValue(builder, getValue(fields, "clientPort"));
-                addValue(builder, getValue(fields, "reqStart"));
-                addValue(builder, getValue(fields, "reqStop"));
-                addValue(builder, getValue(fields, "respStart"));
-                addValue(builder, getValue(fields, "respStop"));
-                addValue(builder, String.valueOf(reqStop - reqStart));
-                addValue(builder, String.valueOf(respStart - reqStop));
-                addValue(builder, String.valueOf(respStop - respStart));
-                addValue(builder, getValue(fields, "reqHdrSize"));
-                addValue(builder, getValue(fields, "reqCntSize"));
-                addValue(builder, getValue(fields, "respHdrSize"));
-                addValue(builder, getValue(fields, "respCntSize"));
-                addValue(builder, getValue(fields, "code"));
-                addValue(builder, getValue(fields, "method"));
-                addValue(builder, getValue(fields, "url"));
-                builder.setLength(builder.length() - 1);
-                reqRespFile.println(builder.toString());
-                reqRespFile.flush();
-                builder.setLength(0);
-            }
-        }
-
-        return "OK";
-    }
-
-    private static void printDuration(long start, long end, String name, String url) {
-        long diff = end - start;
-        if (diff > 600) {
-            System.out.println("long " + name + ": " + diff + " " + url);
-        }
-    }
-
-    @Override
-    public CharSequence handleMetricsCollection(MetricsCollectionRecord collection) throws AvroRemoteException {
-        if (verbose) {
-            System.out.println(collection);
-        }
-        StringBuilder builder = new StringBuilder();
-
-        addValue(builder, collection.getStartTimestamp());
-        addValue(builder, collection.getEndTimestamp());
-
-        MetricRecord emitterMetric = findMetric(collection, "emitter");
-        if (emitterMetric != null) {
-            Map<CharSequence, Object> emitterFields = emitterMetric.getFields();
-            addValue(builder, emitterFields.get(new Utf8("receivedBytes")));
-            addValue(builder, emitterFields.get(new Utf8("sentBytes")));
-            addValue(builder, emitterFields.get(new Utf8("establishedConnections")));
-            addValue(builder, emitterFields.get(new Utf8("closedConnections")));
-            addValue(builder, emitterFields.get(new Utf8("bindErrors")));
-
-        } else {
-            builder.append(";;;;;");
-        }
-
-        emitterMetric = findMetric(collection, "pcapSource");
-        if (emitterMetric != null) {
-            Map<CharSequence, Object> emitterFields = emitterMetric.getFields();
-            addValue(builder, emitterFields.get(new Utf8("frames")));
-            addValue(builder, emitterFields.get(new Utf8("tcpPackets")));
-        } else {
-            builder.append(";;");
-        }
-
-        emitterFile.println(builder.toString());
-        emitterFile.flush();
-
-        return "OK";
-    }
-
-    private static MetricRecord findMetric(MetricsCollectionRecord collection, String name) {
-        List<MetricRecord> metrics = collection.getMetrics();
-        for (MetricRecord metric : metrics) {
-            if (metric.getCode().toString().equals(name)) {
-                return metric;
-            }
-        }
-        return null;
-    }
-}
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ServerMain.java	Wed Sep 20 23:11:11 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ServerMain.java	Thu Sep 21 11:56:09 2017 +0200
@@ -1,5 +1,6 @@
 package com.passus.st.reporter.server;
 
+import com.passus.st.reporter.ReporterImpl;
 import com.passus.st.reporter.SnmpLogger;
 import com.passus.st.reporter.protocol.Reporter;
 import java.io.IOException;
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterClient.java	Wed Sep 20 23:11:11 2017 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,205 +0,0 @@
-package com.passus.st.reporter.trx;
-
-import com.passus.commons.metric.Metric;
-import com.passus.commons.metric.MetricsCollection;
-import com.passus.commons.service.Service;
-import com.passus.st.reporter.MetricConverter;
-import com.passus.st.reporter.protocol.MetricRecord;
-import com.passus.st.reporter.protocol.MetricsCollectionRecord;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import org.apache.avro.AvroRemoteException;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- *
- * @author mikolaj.podbielski
- */
-public class ReporterClient implements Service {
-
-    private static final Logger LOGGER = LogManager.getLogger(ReporterClient.class);
-
-    private final MetricConverter converter = new MetricConverter();
-    private final InetSocketAddress serverAddress;
-    private final BlockingQueue<Object> queue;
-    private final boolean dropIfQueueFull;
-    private volatile boolean working;
-//    private SenderThread sender;
-    private SenderThread[] senders = new SenderThread[4];
-
-    public ReporterClient(InetSocketAddress serverAddress) {
-        this.serverAddress = serverAddress;
-        this.queue = new ArrayBlockingQueue<>(4096);
-        this.dropIfQueueFull = true;
-    }
-
-    public ReporterClient(InetSocketAddress serverAddress, BlockingQueue<Object> queue, boolean dropIfQueueFull) {
-        this.serverAddress = serverAddress;
-        this.queue = queue;
-        this.dropIfQueueFull = dropIfQueueFull;
-    }
-
-    @Override
-    public boolean isStarted() {
-        return working;
-    }
-
-    @Override
-    public void start() {
-        if (working) {
-            return;
-        }
-
-        working = true;
-//        sender = new SenderThread();
-//        try {
-//            sender.connect();
-//        } catch (IOException ignore) {
-//        }
-//        sender.start();
-        for (int i = 0; i < senders.length; i++) {
-            senders[i] = new SenderThread();
-            try {
-                senders[i].connect();
-            } catch (IOException ignore) {
-            }
-            senders[i].start();
-        }
-
-    }
-
-    @Override
-    public void stop() {
-        if (working) {
-            working = false;
-//            sender.interrupt();
-//            try {
-//                sender.join(500);
-//            } catch (Exception ex) {
-//            }
-//            sender = null;
-            for (int i = 0; i < senders.length; i++) {
-                senders[i].interrupt();
-                try {
-                    senders[i].join(500);
-                } catch (Exception ex) {
-                }
-                senders[i] = null;
-            }
-        }
-    }
-
-    public void waitForEmptyQueue() throws InterruptedException {
-        while (queue.isEmpty() == false) {
-            Thread.sleep(100);
-        }
-    }
-
-    public boolean send(Object object) {
-        if (dropIfQueueFull) {
-            if (!queue.offer(object)) {
-                LOGGER.debug("Could not enqueue message.");
-                return false;
-            }
-        } else {
-            try {
-                queue.put(object);
-            } catch (InterruptedException ignore) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private class SenderThread extends Thread {
-
-        private final ReporterEncoder encoder = new ReporterEncoder();
-        private Socket socket; // only synchronized access
-        private volatile DataOutputStream os;
-
-        private synchronized void connect() throws IOException {
-            try {
-                socket = new Socket(serverAddress.getAddress(), serverAddress.getPort());
-                os = new DataOutputStream(socket.getOutputStream());
-            } catch (IOException ex) {
-                LOGGER.warn("Could not connect.", ex);
-            }
-        }
-
-        private synchronized void disconnect() {
-            try {
-                if (socket != null) {
-                    socket.close();
-                    socket = null;
-                    os = null;
-                }
-            } catch (IOException ignore) {
-            }
-        }
-
-        private boolean isConnected() {
-            return os != null;
-        }
-
-        @Override
-        public void run() {
-            while (working) {
-                try {
-                    // TODO: refactor
-                    Object metric = queue.take();
-                    int code;
-                    byte[] bytes;
-                    if (metric instanceof Metric) {
-                        MetricRecord record = converter.convert((Metric) metric, false);
-                        code = ReporterDispatcher.TYPE_METRIC;
-                        bytes = encoder.encode(record);
-                    } else if (metric instanceof MetricsCollection) {
-                        MetricsCollectionRecord record = converter.convert((MetricsCollection) metric);
-                        code = ReporterDispatcher.TYPE_METRICS_COLLECTION;
-                        bytes = encoder.encode(record);
-                    } else {
-                        continue;
-                    }
-
-                    if (!isConnected()) {
-                        connect();
-                    }
-                    if (isConnected()) {
-//                        CharSequence result;
-//                        if (record instanceof MetricRecord) {
-//                            result = proxy.handleMetric((MetricRecord) record);
-//                        } else if (record instanceof MetricsCollectionRecord) {
-//                            result = proxy.handleMetricsCollection((MetricsCollectionRecord) record);
-//                        } else {
-//                            continue;
-//                        }
-//                        LOGGER.trace("result: {}", result);
-                        synchronized (this) {
-                            os.writeInt(bytes.length + 4);
-                            os.writeInt(code);
-                            os.write(bytes);
-                        }
-                    }
-                } catch (InterruptedException ex) {
-                    LOGGER.trace("Queue.take() was interrupted.");
-                } catch (AvroRemoteException ex) {
-                    LOGGER.warn("Could not send.", ex);
-                } catch (IOException ex) {
-                    LOGGER.warn("Could not connect.", ex);
-                } catch (Exception ex) {
-                    LOGGER.error(ex);
-                }
-
-            }
-
-            disconnect();
-            LOGGER.debug("Sender stopped.");
-        }
-
-    }
-}
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ServerMain.java	Wed Sep 20 23:11:11 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ServerMain.java	Thu Sep 21 11:56:09 2017 +0200
@@ -1,7 +1,7 @@
 package com.passus.st.reporter.trx;
 
 import com.passus.st.reporter.protocol.Reporter;
-import com.passus.st.reporter.server.ReporterImpl;
+import com.passus.st.reporter.ReporterImpl;
 import com.passus.st.utils.CliUtils;
 import static com.passus.st.utils.CliUtils.option;
 import io.netty.bootstrap.ServerBootstrap;
@@ -31,7 +31,7 @@
  */
 public class ServerMain {
 
-    public static final int PORT = 400;
+    public static final int PORT = 11111;
     public static final boolean EPOLL = false;
 
     private final String bindHost;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/SocketReporterClient.java	Thu Sep 21 11:56:09 2017 +0200
@@ -0,0 +1,209 @@
+package com.passus.st.reporter.trx;
+
+import com.passus.commons.metric.Metric;
+import com.passus.commons.metric.MetricsCollection;
+import com.passus.st.reporter.MetricConverter;
+import com.passus.st.reporter.ReporterClient;
+import com.passus.st.reporter.protocol.MetricRecord;
+import com.passus.st.reporter.protocol.MetricsCollectionRecord;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import org.apache.avro.AvroRemoteException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class SocketReporterClient implements ReporterClient {
+
+    private static final Logger LOGGER = LogManager.getLogger(SocketReporterClient.class);
+
+    private final MetricConverter converter = new MetricConverter();
+    private final InetSocketAddress serverAddress;
+    private final BlockingQueue<Object> queue;
+    private final boolean dropIfQueueFull;
+    private volatile boolean working;
+//    private SenderThread sender;
+    private SenderThread[] senders = new SenderThread[4];
+
+    public SocketReporterClient(InetSocketAddress serverAddress) {
+        this.serverAddress = serverAddress;
+        this.queue = new ArrayBlockingQueue<>(4096);
+        this.dropIfQueueFull = true;
+    }
+
+    public SocketReporterClient(InetSocketAddress serverAddress, BlockingQueue<Object> queue, boolean dropIfQueueFull) {
+        this.serverAddress = serverAddress;
+        this.queue = queue;
+        this.dropIfQueueFull = dropIfQueueFull;
+    }
+
+    @Override
+    public boolean isStarted() {
+        return working;
+    }
+
+    @Override
+    public void start() {
+        if (working) {
+            return;
+        }
+
+        working = true;
+//        sender = new SenderThread();
+//        try {
+//            sender.connect();
+//        } catch (IOException ignore) {
+//        }
+//        sender.start();
+        for (int i = 0; i < senders.length; i++) {
+            senders[i] = new SenderThread();
+            try {
+                senders[i].connect();
+            } catch (IOException ignore) {
+            }
+            senders[i].start();
+        }
+
+    }
+
+    @Override
+    public void stop() {
+        if (working) {
+            working = false;
+//            sender.interrupt();
+//            try {
+//                sender.join(500);
+//            } catch (Exception ex) {
+//            }
+//            sender = null;
+            for (int i = 0; i < senders.length; i++) {
+                senders[i].interrupt();
+                try {
+                    senders[i].join(500);
+                } catch (Exception ex) {
+                }
+                senders[i] = null;
+            }
+        }
+    }
+
+    @Override
+    public void waitForEmptyQueue() throws InterruptedException {
+        while (queue.isEmpty() == false) {
+            Thread.sleep(100);
+        }
+    }
+
+    @Override
+    public boolean send(Object object) {
+        if (dropIfQueueFull) {
+            if (!queue.offer(object)) {
+                System.out.println("reporter queue full");
+                LOGGER.debug("Could not enqueue message.");
+                return false;
+            }
+        } else {
+            try {
+                queue.put(object);
+            } catch (InterruptedException ignore) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private class SenderThread extends Thread {
+
+        private final ReporterEncoder encoder = new ReporterEncoder();
+        private Socket socket; // only synchronized access
+        private volatile DataOutputStream os;
+
+        private synchronized void connect() throws IOException {
+            try {
+                socket = new Socket(serverAddress.getAddress(), serverAddress.getPort());
+                os = new DataOutputStream(socket.getOutputStream());
+            } catch (IOException ex) {
+                LOGGER.warn("Could not connect.", ex);
+            }
+        }
+
+        private synchronized void disconnect() {
+            try {
+                if (socket != null) {
+                    socket.close();
+                    socket = null;
+                    os = null;
+                }
+            } catch (IOException ignore) {
+            }
+        }
+
+        private boolean isConnected() {
+            return os != null;
+        }
+
+        @Override
+        public void run() {
+            while (working) {
+                try {
+                    // TODO: refactor
+                    Object metric = queue.take();
+                    int code;
+                    byte[] bytes;
+                    if (metric instanceof Metric) {
+                        MetricRecord record = converter.convert((Metric) metric, false);
+                        code = ReporterDispatcher.TYPE_METRIC;
+                        bytes = encoder.encode(record);
+                    } else if (metric instanceof MetricsCollection) {
+                        MetricsCollectionRecord record = converter.convert((MetricsCollection) metric);
+                        code = ReporterDispatcher.TYPE_METRICS_COLLECTION;
+                        bytes = encoder.encode(record);
+                    } else {
+                        System.out.println(metric.getClass());
+                        continue;
+                    }
+
+                    if (!isConnected()) {
+                        connect();
+                    }
+                    if (isConnected()) {
+//                        CharSequence result;
+//                        if (record instanceof MetricRecord) {
+//                            result = proxy.handleMetric((MetricRecord) record);
+//                        } else if (record instanceof MetricsCollectionRecord) {
+//                            result = proxy.handleMetricsCollection((MetricsCollectionRecord) record);
+//                        } else {
+//                            continue;
+//                        }
+//                        LOGGER.trace("result: {}", result);
+                        synchronized (this) {
+                            os.writeInt(bytes.length + 4);
+                            os.writeInt(code);
+                            os.write(bytes);
+                        }
+                    }
+                } catch (InterruptedException ex) {
+                    LOGGER.trace("Queue.take() was interrupted.");
+                } catch (AvroRemoteException ex) {
+                    LOGGER.warn("Could not send.", ex);
+                } catch (IOException ex) {
+                    LOGGER.warn("Could not connect.", ex);
+                } catch (Exception ex) {
+                    LOGGER.error(ex);
+                }
+
+            }
+
+            disconnect();
+            LOGGER.debug("Sender stopped.");
+        }
+
+    }
+}
--- a/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/Main.java	Wed Sep 20 23:11:11 2017 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,51 +0,0 @@
-package com.passus.st.reporter.server;
-
-import com.passus.st.reporter.protocol.MetricRecord;
-import com.passus.st.reporter.trx.ReporterClient;
-import com.passus.st.reporter.trx.ReporterDispatcher;
-import com.passus.st.reporter.trx.ReporterEncoder;
-import java.io.DataOutputStream;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- *
- * @author mikolaj.podbielski
- */
-public class Main {
-
-    public static void main(String[] args) throws Exception {
-        Map<CharSequence, Object> fields = new HashMap<>();
-        fields.put("url", "wp.pl/xyz");
-        fields.put("ip", "127.0.0.1");
-        fields.put("port", 12345);
-        fields.put("timestamp", 1234567890L);
-        MetricRecord metricRecord = new MetricRecord("httpRequestResponse", fields);
-
-        ReporterEncoder reporterEncoder = new ReporterEncoder();
-        byte[] bytes =  reporterEncoder.encode(metricRecord);
-        int code = ReporterDispatcher.TYPE_METRIC;
-        int length = bytes.length + 4;
-
-        int count = 6;
-        try (Socket s = new Socket("localhost", 400);
-                DataOutputStream os = new DataOutputStream(s.getOutputStream());) {
-
-            for (int i = 0; i < count; i++) {
-                os.writeInt(length);
-                os.writeInt(code);
-                os.write(bytes);
-            }
-        }
-        
-        
-        ReporterClient rc = new ReporterClient(new InetSocketAddress("localhost", 400));
-        rc.start();
-        rc.send(new TestMetric());
-        rc.waitForEmptyQueue();
-        rc.stop();
-    }
-
-}
--- a/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/Test.java	Wed Sep 20 23:11:11 2017 +0200
+++ b/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/Test.java	Thu Sep 21 11:56:09 2017 +0200
@@ -2,6 +2,7 @@
 
 import static com.passus.commons.collection.FluentBuilder.*;
 import com.passus.data.ByteString;
+import com.passus.st.reporter.ReporterImpl;
 import com.passus.st.reporter.protocol.MetricRecord;
 import com.passus.st.reporter.protocol.Reporter;
 import java.io.IOException;
--- a/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestCustom.java	Wed Sep 20 23:11:11 2017 +0200
+++ b/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestCustom.java	Thu Sep 21 11:56:09 2017 +0200
@@ -1,13 +1,15 @@
 package com.passus.st.reporter.server;
 
-import com.passus.commons.metric.Metric;
+import com.passus.st.reporter.ReporterImpl;
+import com.passus.st.reporter.protocol.MetricRecord;
+import com.passus.st.reporter.trx.ReporterDispatcher;
+import com.passus.st.reporter.trx.ReporterEncoder;
 import com.passus.st.reporter.trx.ServerMain;
-import com.passus.st.reporter.trx.ReporterClient;
-import java.io.IOException;
-import java.io.Serializable;
+import com.passus.st.reporter.trx.SocketReporterClient;
+import java.io.DataOutputStream;
+import java.net.Socket;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 
 /**
  *
@@ -17,20 +19,48 @@
 
     static final TestMetric METRIC = new TestMetric();
 
-    public static void main(String[] args) throws IOException, InterruptedException {
+    public static void main(String[] args) throws Exception {
         ReporterImpl reporter = new ReporterImpl();
         reporter.setVerbose(true);
         ServerMain server = new ServerMain("localhost", ServerMain.PORT, 100, reporter);
         server.start();
         System.out.println("server started");
 
-        ReporterClient client = new ReporterClient(Test.ADDRESS);
-        client.start();
-        client.send(METRIC);
-        client.waitForEmptyQueue();
-        client.stop();
+        client();
 
         server.stop();
     }
 
+    static void manual() throws Exception {
+        Map<CharSequence, Object> fields = new HashMap<>();
+        fields.put("url", "wp.pl/xyz");
+        fields.put("ip", "127.0.0.1");
+        fields.put("port", 12345);
+        fields.put("timestamp", 1234567890L);
+        MetricRecord metricRecord = new MetricRecord("httpRequestResponse", fields);
+
+        ReporterEncoder reporterEncoder = new ReporterEncoder();
+        byte[] bytes = reporterEncoder.encode(metricRecord);
+        int code = ReporterDispatcher.TYPE_METRIC;
+        int length = bytes.length + 4;
+
+        int count = 6;
+        try (Socket s = new Socket("localhost", 11111);
+                DataOutputStream os = new DataOutputStream(s.getOutputStream());) {
+
+            for (int i = 0; i < count; i++) {
+                os.writeInt(length);
+                os.writeInt(code);
+                os.write(bytes);
+            }
+        }
+    }
+
+    static void client() throws Exception {
+        SocketReporterClient rc = new SocketReporterClient(Test.ADDRESS);
+        rc.start();
+        rc.send(METRIC);
+        rc.waitForEmptyQueue();
+        rc.stop();
+    }
 }
--- a/stress-tester/src/main/java/com/passus/st/Main.java	Wed Sep 20 23:11:11 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/Main.java	Thu Sep 21 11:56:09 2017 +0200
@@ -24,7 +24,9 @@
 import com.passus.st.metric.FileMetricsCollectionAppender;
 import com.passus.st.metric.ScheduledMetricsCollector;
 import com.passus.st.metric.SummrizeMetricsCollectionHandler;
-import com.passus.st.reporter.server.ReporterClient;
+import com.passus.st.reporter.ReporterClient;
+import com.passus.st.reporter.server.AvroRpcReporterClient;
+import com.passus.st.reporter.trx.SocketReporterClient;
 import com.passus.st.source.PcapSessionEventSource;
 import static com.passus.st.utils.CliUtils.option;
 import com.passus.st.utils.PeriodFormatter;
@@ -156,6 +158,10 @@
                 .build()
         );
 
+        options.addOption(option("nrp", "newReporterProto").desc("Enables new reporter protocol.")
+                .hasArg(false)
+                .build());
+
         options.addOption(option("wf", "writeFile").desc("Write result to file.")
                 .hasArg().argName("file").optionalArg(true)
                 .build()
@@ -302,7 +308,12 @@
             if (cl.hasOption("ri")) {
                 int port = 11111;
                 InetAddress addr = InetAddress.getByName(cl.getOptionValue("ri"));
-                reporterClient = new ReporterClient(new InetSocketAddress(addr, port));
+                InetSocketAddress socketAddr = new InetSocketAddress(addr, port);
+                if (cl.hasOption("nrp")) {
+                    reporterClient = new SocketReporterClient(socketAddr);
+                } else {
+                    reporterClient = new AvroRpcReporterClient(socketAddr);
+                }
                 reporterClient.start();
 
                 client.addListener(new HttpReporterClientListener(reporterClient));
--- a/stress-tester/src/main/java/com/passus/st/PcapReporter.java	Wed Sep 20 23:11:11 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/PcapReporter.java	Thu Sep 21 11:56:09 2017 +0200
@@ -28,7 +28,7 @@
 import com.passus.st.metric.FileMetricsCollectionAppender;
 import com.passus.st.metric.ScheduledMetricsCollector;
 import com.passus.st.metric.SummrizeMetricsCollectionHandler;
-import com.passus.st.reporter.server.ReporterClient;
+import com.passus.st.reporter.trx.SocketReporterClient;
 import com.passus.st.source.PcapSessionEventSource;
 import static com.passus.st.utils.CliUtils.option;
 import com.passus.st.utils.PeriodFormatter;
@@ -43,6 +43,8 @@
 import org.apache.commons.cli.ParseException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import static com.passus.st.Main.printMetrics;
+import static com.passus.st.utils.CliUtils.option;
 
 /**
  *
@@ -91,7 +93,7 @@
             Log4jConfigurationFactory.enableFactory(logLevel);
 
             InetAddress addr = InetAddress.getByName(cl.getOptionValue("ri"));
-            ReporterClient reporterClient = new ReporterClient(new InetSocketAddress(addr, 11111));
+            SocketReporterClient reporterClient = new SocketReporterClient(new InetSocketAddress(addr, 11111));
             reporterClient.start();
 
             LocalHandler lh = new LocalHandler(reporterClient, cl.hasOption("ps"));
@@ -174,7 +176,7 @@
         private volatile int count;
         private volatile boolean finished;
 
-        public LocalHandler(ReporterClient reporterClient, boolean partialSession) {
+        public LocalHandler(SocketReporterClient reporterClient, boolean partialSession) {
             reporter = new HttpReporterClientListener(reporterClient);
             this.partialSession = partialSession;
         }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpReporterClientListener.java	Wed Sep 20 23:11:11 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpReporterClientListener.java	Thu Sep 21 11:56:09 2017 +0200
@@ -14,7 +14,7 @@
 import static com.passus.st.client.http.HttpConsts.TAG_TIME_END;
 import static com.passus.st.client.http.HttpConsts.TAG_TIME_START;
 import com.passus.st.emitter.SessionInfo;
-import com.passus.st.reporter.server.ReporterClient;
+import com.passus.st.reporter.ReporterClient;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpReporterMetricHandler.java	Wed Sep 20 23:11:11 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpReporterMetricHandler.java	Thu Sep 21 11:56:09 2017 +0200
@@ -2,7 +2,7 @@
 
 import com.passus.commons.metric.MetricsCollection;
 import com.passus.st.metric.MetricsCollectionHandler;
-import com.passus.st.reporter.server.ReporterClient;
+import com.passus.st.reporter.ReporterClient;
 
 /**
  *
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Wed Sep 20 23:11:11 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Thu Sep 21 11:56:09 2017 +0200
@@ -131,7 +131,7 @@
     public String toString() {
         StringBuilder sb = new StringBuilder();
 
-        sb.append(SessionUtils.transportToString(transport)).append(" ");
+//        sb.append(SessionUtils.transportToString(transport)).append(" ");
         sb.append(srcIp).append(":").append(srcPort);
         sb.append("<->");
         sb.append(dstIp).append(":").append(dstPort);