changeset 557:f65e374eb0f8

reporter - transport
author Devel 1
date Wed, 20 Sep 2017 23:11:11 +0200
parents d5c14b4259ea
children 7c805a77d33d
files stress-tester-benchmark/src/main/java/com/passus/st/avro/AbstractAvroBenchmark.java stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroCustomBenchmark.java stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroRpcBenchmark.java stress-tester-benchmark/src/main/java/com/passus/st/avro/TestMetric.java stress-tester-reporter/pom.xml stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ServerMain.java stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterClient.java stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterDispatcher.java stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterEncoder.java stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ServerMain.java stress-tester-reporter/src/test/java/com/passus/st/reporter/server/Main.java stress-tester-reporter/src/test/java/com/passus/st/reporter/server/Test.java stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestCustom.java stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestMetric.java
diffstat 14 files changed, 804 insertions(+), 18 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester-benchmark/src/main/java/com/passus/st/avro/AbstractAvroBenchmark.java	Wed Sep 20 13:29:12 2017 +0200
+++ b/stress-tester-benchmark/src/main/java/com/passus/st/avro/AbstractAvroBenchmark.java	Wed Sep 20 23:11:11 2017 +0200
@@ -3,8 +3,11 @@
 import static com.passus.commons.collection.FluentBuilder.e;
 import static com.passus.commons.collection.FluentBuilder.map;
 import com.passus.st.reporter.protocol.MetricRecord;
+import com.passus.st.reporter.protocol.MetricsCollectionRecord;
+import com.passus.st.reporter.protocol.Reporter;
 import java.net.InetSocketAddress;
 import java.util.Map;
+import org.apache.avro.AvroRemoteException;
 
 /**
  *
@@ -12,7 +15,7 @@
  */
 public class AbstractAvroBenchmark {
 
-    protected final InetSocketAddress serverAddress = new InetSocketAddress(11111);
+    protected final InetSocketAddress serverAddress = new InetSocketAddress("localhost", 11111);
     protected final MetricRecord bigMetricRecord;
     protected final MetricRecord smallMetricRecord;
 
@@ -25,4 +28,18 @@
         bigMetricRecord = new MetricRecord("big", map);
         smallMetricRecord = new MetricRecord("small", nested);
     }
+
+    protected static class DummyReporter implements Reporter {
+
+        @Override
+        public CharSequence handleMetric(MetricRecord metric) throws AvroRemoteException {
+            return "OK";
+        }
+
+        @Override
+        public CharSequence handleMetricsCollection(MetricsCollectionRecord collection) throws AvroRemoteException {
+            return "OK";
+        }
+
+    }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroCustomBenchmark.java	Wed Sep 20 23:11:11 2017 +0200
@@ -0,0 +1,74 @@
+package com.passus.st.avro;
+
+import com.passus.st.reporter.trx.ServerMain;
+import com.passus.st.reporter.trx.ReporterClient;
+import com.passus.st.reporter.protocol.Reporter;
+import com.passus.utils.AllocationUtils;
+import java.io.IOException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.avro.AvroRemoteException;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+@State(Scope.Thread)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@Fork(value = 1)
+@Measurement(iterations = 7)
+@Warmup(iterations = 7)
+public class AvroCustomBenchmark extends AbstractAvroBenchmark {
+
+    private final ServerMain server = new ServerMain("localhost", 11111, 500, new DummyReporter());
+    private final ReporterClient client = new ReporterClient(serverAddress, new SynchronousQueue<>(), false);
+    private final TestMetric metric = new TestMetric();
+
+    @Setup
+    public void setup() throws IOException {
+        server.start();
+        client.start();
+    }
+
+    @TearDown
+    public void tearDown() {
+        server.stop();
+        client.stop();
+    }
+
+    @Benchmark
+    public Object metric() throws AvroRemoteException {
+        boolean send = client.send(metric);
+        if (send == false) {
+            throw new IllegalStateException();
+        }
+        return send;
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options opt = new OptionsBuilder().include(AvroCustomBenchmark.class.getSimpleName() + ".*").build();
+        new Runner(opt).run();
+
+        AllocationUtils au = new AllocationUtils();
+        AvroCustomBenchmark benchmark = new AvroCustomBenchmark();
+        benchmark.setup();
+        au.checkAllocation("metric", benchmark::metric);
+        benchmark.client.send(new TestMetric());
+        benchmark.tearDown();
+    }
+}
--- a/stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroRpcBenchmark.java	Wed Sep 20 13:29:12 2017 +0200
+++ b/stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroRpcBenchmark.java	Wed Sep 20 23:11:11 2017 +0200
@@ -81,17 +81,4 @@
         benchmark.tearDown();
     }
 
-    private static class DummyReporter implements Reporter {
-
-        @Override
-        public CharSequence handleMetric(MetricRecord metric) throws AvroRemoteException {
-            return "OK";
-        }
-
-        @Override
-        public CharSequence handleMetricsCollection(MetricsCollectionRecord collection) throws AvroRemoteException {
-            return "OK";
-        }
-
-    }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester-benchmark/src/main/java/com/passus/st/avro/TestMetric.java	Wed Sep 20 23:11:11 2017 +0200
@@ -0,0 +1,78 @@
+package com.passus.st.avro;
+
+import com.passus.commons.metric.Metric;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class TestMetric implements Metric {
+
+    private final HashMap<String, Serializable> attributes = new HashMap<>();
+
+    {
+        attributes.put("String", "value");
+        attributes.put("Long", 12345L);
+        attributes.put("Integer", 123);
+        attributes.put("Float", 123.45f);
+    }
+
+    @Override
+    public String getName() {
+        return "test";
+    }
+
+    @Override
+    public boolean isActive() {
+        return true;
+    }
+
+    @Override
+    public void activate() {
+    }
+
+    @Override
+    public void deactivate() {
+    }
+
+    @Override
+    public Class getAttributeClass(String name) {
+        Serializable value = attributes.get(name);
+        return value.getClass();
+    }
+
+    @Override
+    public boolean hasAttribute(String name) {
+        return attributes.containsKey(name);
+    }
+
+    @Override
+    public Set<String> getAttributesName() {
+        return attributes.keySet();
+    }
+
+    @Override
+    public Serializable getAttributeValue(String name) {
+        return attributes.get(name);
+    }
+
+    @Override
+    public Map<String, Serializable> getAttributesValue() {
+        return attributes;
+    }
+
+    @Override
+    public void update(Metric metric) {
+        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    }
+
+    @Override
+    public void reset() {
+        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    }
+
+}
--- a/stress-tester-reporter/pom.xml	Wed Sep 20 13:29:12 2017 +0200
+++ b/stress-tester-reporter/pom.xml	Wed Sep 20 23:11:11 2017 +0200
@@ -79,6 +79,12 @@
             <artifactId>commons-cli</artifactId>
             <version>1.3.1</version>
         </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>4.1.6.Final</version>
+            <type>jar</type>
+        </dependency>
     </dependencies>
     
     <build>
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ServerMain.java	Wed Sep 20 13:29:12 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ServerMain.java	Wed Sep 20 23:11:11 2017 +0200
@@ -22,7 +22,6 @@
 public class ServerMain {
 
     static final int PORT = 11111;
-    static SnmpLogger snmp;
 
     static void printHelp(Options options) {
         HelpFormatter formatter = new HelpFormatter();
@@ -57,6 +56,7 @@
         );
 
         boolean merge;
+        SnmpLogger snmp;
         try {
             CommandLine cl = new DefaultParser().parse(options, args);
             if (cl.hasOption("s")) {
@@ -69,6 +69,8 @@
                 // Community option without SNMP option specified
                 printError("Options <snmpCommunity> and <snmpPeriod> require to specify SNMP Address.");
                 return;
+            } else {
+                snmp = null;
             }
 
             merge = cl.hasOption("m");
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterClient.java	Wed Sep 20 23:11:11 2017 +0200
@@ -0,0 +1,205 @@
+package com.passus.st.reporter.trx;
+
+import com.passus.commons.metric.Metric;
+import com.passus.commons.metric.MetricsCollection;
+import com.passus.commons.service.Service;
+import com.passus.st.reporter.MetricConverter;
+import com.passus.st.reporter.protocol.MetricRecord;
+import com.passus.st.reporter.protocol.MetricsCollectionRecord;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import org.apache.avro.AvroRemoteException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class ReporterClient implements Service {
+
+    private static final Logger LOGGER = LogManager.getLogger(ReporterClient.class);
+
+    private final MetricConverter converter = new MetricConverter();
+    private final InetSocketAddress serverAddress;
+    private final BlockingQueue<Object> queue;
+    private final boolean dropIfQueueFull;
+    private volatile boolean working;
+//    private SenderThread sender;
+    private SenderThread[] senders = new SenderThread[4];
+
+    public ReporterClient(InetSocketAddress serverAddress) {
+        this.serverAddress = serverAddress;
+        this.queue = new ArrayBlockingQueue<>(4096);
+        this.dropIfQueueFull = true;
+    }
+
+    public ReporterClient(InetSocketAddress serverAddress, BlockingQueue<Object> queue, boolean dropIfQueueFull) {
+        this.serverAddress = serverAddress;
+        this.queue = queue;
+        this.dropIfQueueFull = dropIfQueueFull;
+    }
+
+    @Override
+    public boolean isStarted() {
+        return working;
+    }
+
+    @Override
+    public void start() {
+        if (working) {
+            return;
+        }
+
+        working = true;
+//        sender = new SenderThread();
+//        try {
+//            sender.connect();
+//        } catch (IOException ignore) {
+//        }
+//        sender.start();
+        for (int i = 0; i < senders.length; i++) {
+            senders[i] = new SenderThread();
+            try {
+                senders[i].connect();
+            } catch (IOException ignore) {
+            }
+            senders[i].start();
+        }
+
+    }
+
+    @Override
+    public void stop() {
+        if (working) {
+            working = false;
+//            sender.interrupt();
+//            try {
+//                sender.join(500);
+//            } catch (Exception ex) {
+//            }
+//            sender = null;
+            for (int i = 0; i < senders.length; i++) {
+                senders[i].interrupt();
+                try {
+                    senders[i].join(500);
+                } catch (Exception ex) {
+                }
+                senders[i] = null;
+            }
+        }
+    }
+
+    public void waitForEmptyQueue() throws InterruptedException {
+        while (queue.isEmpty() == false) {
+            Thread.sleep(100);
+        }
+    }
+
+    public boolean send(Object object) {
+        if (dropIfQueueFull) {
+            if (!queue.offer(object)) {
+                LOGGER.debug("Could not enqueue message.");
+                return false;
+            }
+        } else {
+            try {
+                queue.put(object);
+            } catch (InterruptedException ignore) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private class SenderThread extends Thread {
+
+        private final ReporterEncoder encoder = new ReporterEncoder();
+        private Socket socket; // only synchronized access
+        private volatile DataOutputStream os;
+
+        private synchronized void connect() throws IOException {
+            try {
+                socket = new Socket(serverAddress.getAddress(), serverAddress.getPort());
+                os = new DataOutputStream(socket.getOutputStream());
+            } catch (IOException ex) {
+                LOGGER.warn("Could not connect.", ex);
+            }
+        }
+
+        private synchronized void disconnect() {
+            try {
+                if (socket != null) {
+                    socket.close();
+                    socket = null;
+                    os = null;
+                }
+            } catch (IOException ignore) {
+            }
+        }
+
+        private boolean isConnected() {
+            return os != null;
+        }
+
+        @Override
+        public void run() {
+            while (working) {
+                try {
+                    // TODO: refactor
+                    Object metric = queue.take();
+                    int code;
+                    byte[] bytes;
+                    if (metric instanceof Metric) {
+                        MetricRecord record = converter.convert((Metric) metric, false);
+                        code = ReporterDispatcher.TYPE_METRIC;
+                        bytes = encoder.encode(record);
+                    } else if (metric instanceof MetricsCollection) {
+                        MetricsCollectionRecord record = converter.convert((MetricsCollection) metric);
+                        code = ReporterDispatcher.TYPE_METRICS_COLLECTION;
+                        bytes = encoder.encode(record);
+                    } else {
+                        continue;
+                    }
+
+                    if (!isConnected()) {
+                        connect();
+                    }
+                    if (isConnected()) {
+//                        CharSequence result;
+//                        if (record instanceof MetricRecord) {
+//                            result = proxy.handleMetric((MetricRecord) record);
+//                        } else if (record instanceof MetricsCollectionRecord) {
+//                            result = proxy.handleMetricsCollection((MetricsCollectionRecord) record);
+//                        } else {
+//                            continue;
+//                        }
+//                        LOGGER.trace("result: {}", result);
+                        synchronized (this) {
+                            os.writeInt(bytes.length + 4);
+                            os.writeInt(code);
+                            os.write(bytes);
+                        }
+                    }
+                } catch (InterruptedException ex) {
+                    LOGGER.trace("Queue.take() was interrupted.");
+                } catch (AvroRemoteException ex) {
+                    LOGGER.warn("Could not send.", ex);
+                } catch (IOException ex) {
+                    LOGGER.warn("Could not connect.", ex);
+                } catch (Exception ex) {
+                    LOGGER.error(ex);
+                }
+
+            }
+
+            disconnect();
+            LOGGER.debug("Sender stopped.");
+        }
+
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterDispatcher.java	Wed Sep 20 23:11:11 2017 +0200
@@ -0,0 +1,62 @@
+package com.passus.st.reporter.trx;
+
+import com.passus.st.reporter.protocol.MetricRecord;
+import com.passus.st.reporter.protocol.MetricsCollectionRecord;
+import com.passus.st.reporter.protocol.Reporter;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class ReporterDispatcher {
+
+    public static final int TYPE_METRIC = 1;
+    public static final int TYPE_METRICS_COLLECTION = 2;
+
+    private static final Schema METRIC_SCHEMA = MetricRecord.getClassSchema();
+    private static final Schema METRICS_COLLECTION_SCHEMA = MetricsCollectionRecord.getClassSchema();
+
+    private final DecoderFactory decoderFactory = DecoderFactory.get();
+    private final SpecificDatumReader<MetricRecord> metricReader = new SpecificDatumReader<>(METRIC_SCHEMA);
+    private final SpecificDatumReader<MetricsCollectionRecord> metricsCollectionReader = new SpecificDatumReader<>(METRICS_COLLECTION_SCHEMA);
+
+    private final Reporter reporter;
+
+    public ReporterDispatcher(Reporter reporter) {
+        this.reporter = reporter;
+    }
+
+    public void dispatch(byte[] frame) throws AvroRemoteException, IOException {
+        int code = getInt4(frame, 0);
+        ByteArrayInputStream bais = new ByteArrayInputStream(frame, 4, frame.length - 4);
+        BinaryDecoder decoder = decoderFactory.binaryDecoder(bais, null);
+
+        switch (code) {
+            case TYPE_METRIC:
+                MetricRecord metric = metricReader.read(null, decoder);
+                reporter.handleMetric(metric);
+                break;
+            case TYPE_METRICS_COLLECTION:
+                MetricsCollectionRecord metricsCollection = metricsCollectionReader.read(null, decoder);
+                reporter.handleMetricsCollection(metricsCollection);
+                break;
+            default:
+                System.out.println("Invalid message code: " + code);
+        }
+    }
+
+    // copy-pasted from DataUtils
+    public static int getInt4(byte[] data, int off) {
+        return (((data[off] & 0xff) << 24)
+                | ((data[off + 1] & 0xff) << 16)
+                | ((data[off + 2] & 0xff) << 8)
+                | ((data[off + 3] & 0xff)));
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterEncoder.java	Wed Sep 20 23:11:11 2017 +0200
@@ -0,0 +1,42 @@
+package com.passus.st.reporter.trx;
+
+import com.passus.st.reporter.protocol.MetricRecord;
+import com.passus.st.reporter.protocol.MetricsCollectionRecord;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class ReporterEncoder {
+
+    private static final Schema METRIC_SCHEMA = MetricRecord.getClassSchema();
+    private static final Schema METRICS_COLLECTION_SCHEMA = MetricsCollectionRecord.getClassSchema();
+
+    private final EncoderFactory encoderFactory = EncoderFactory.get();
+    private final SpecificDatumWriter<MetricRecord> metricWriter = new SpecificDatumWriter<>(METRIC_SCHEMA);
+    private final SpecificDatumWriter<MetricsCollectionRecord> metricsCollectionWriter = new SpecificDatumWriter<>(METRICS_COLLECTION_SCHEMA);
+
+    private final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+    private final BinaryEncoder encoder = encoderFactory.binaryEncoder(baos, null);
+
+    public byte[] encode(MetricRecord metric) throws IOException {
+        return encode(metricWriter, metric);
+    }
+
+    public byte[] encode(MetricsCollectionRecord metricsCollection) throws IOException {
+        return encode(metricsCollectionWriter, metricsCollection);
+    }
+
+    private <T> byte[] encode(SpecificDatumWriter<T> writer, T record) throws IOException {
+        baos.reset();
+        writer.write(record, encoder);
+        encoder.flush();
+        return baos.toByteArray();
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ServerMain.java	Wed Sep 20 23:11:11 2017 +0200
@@ -0,0 +1,148 @@
+package com.passus.st.reporter.trx;
+
+import com.passus.st.reporter.protocol.Reporter;
+import com.passus.st.reporter.server.ReporterImpl;
+import com.passus.st.utils.CliUtils;
+import static com.passus.st.utils.CliUtils.option;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.bytes.ByteArrayDecoder;
+import java.io.IOException;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+import static com.passus.st.utils.CliUtils.option;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class ServerMain {
+
+    public static final int PORT = 400;
+    public static final boolean EPOLL = false;
+
+    private final String bindHost;
+    private final int port;
+    private final int numThreads;
+    private final int maxFrameSize = 8192;
+
+    private final EventLoopGroup bossGroup;
+    private final EventLoopGroup workerGroup;
+    private final Class socketChannelClass;
+
+    private final Reporter reporter;
+
+    public ServerMain(String bindHost, int port, int numThreads, Reporter reporter) {
+        this.bindHost = bindHost;
+        this.port = port;
+        this.numThreads = numThreads;
+        this.reporter = reporter;
+
+        if (EPOLL) {
+            bossGroup = new EpollEventLoopGroup(numThreads);
+            workerGroup = new EpollEventLoopGroup();
+            socketChannelClass = EpollServerSocketChannel.class;
+        } else {
+            bossGroup = new NioEventLoopGroup(numThreads);
+            workerGroup = new NioEventLoopGroup();
+            socketChannelClass = NioServerSocketChannel.class;
+        }
+    }
+
+    public void start() {
+        try {
+            ServerBootstrap boot = new ServerBootstrap();
+            boot.option(ChannelOption.SO_BACKLOG, 100_000);
+            boot.option(ChannelOption.SO_REUSEADDR, true);
+            boot.group(bossGroup, workerGroup)
+                    .channel(socketChannelClass)
+                    .childHandler(new TestServerInitializer());
+
+            Channel ch = boot.bind(bindHost, port).sync().channel();
+            System.err.println("Server started " + bindHost + ":" + port + " epoll: " + EPOLL);
+        } catch (Exception ex) {
+            ex.printStackTrace();
+        }
+    }
+
+    public void stop() {
+        bossGroup.shutdownGracefully();
+        workerGroup.shutdownGracefully();
+        System.err.println("Server stopped.");
+    }
+
+    public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
+
+        @Override
+        public void initChannel(SocketChannel ch) throws Exception {
+            ChannelPipeline pipeline = ch.pipeline();
+
+            pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
+            pipeline.addLast("bytesDecoder", new ByteArrayDecoder());
+            pipeline.addLast(new ReporterChannelInboundHandler(reporter));
+        }
+    }
+
+    public static class ReporterChannelInboundHandler extends SimpleChannelInboundHandler<byte[]> {
+
+        private final ReporterDispatcher dispatcher;
+
+        public ReporterChannelInboundHandler(Reporter reporter) {
+            dispatcher = new ReporterDispatcher(reporter);
+        }
+
+        @Override
+        public void channelRead0(ChannelHandlerContext ctx, byte[] data) throws Exception {
+            dispatcher.dispatch(data);
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+        final String syntax = "server <bind address> [-t threads=500]";
+        final Options options = new Options();
+        options.addOption(option("t", "threads", "number of worker threads", "threads"));
+
+        options.addOption(option("m", "merged").desc("Write file with merged request and response data")
+                .hasArg(false)
+                .build()
+        );
+
+        try {
+            CommandLine cl = new DefaultParser().parse(options, args);
+            String[] clArgs = cl.getArgs();
+            if (clArgs.length != 1) {
+                CliUtils.printHelp(options, syntax);
+                return;
+            }
+
+            int threads = Integer.parseInt(cl.getOptionValue('t', "500"));
+
+            ReporterImpl reporter = new ReporterImpl(cl.hasOption("m"));
+            reporter.setVerbose(false);
+
+            ServerMain server = new ServerMain(clArgs[0], PORT, threads, reporter);
+            server.start();
+
+            Runnable shutdown = () -> {
+                System.out.println("shutdown");
+                server.stop();
+            };
+            Runtime.getRuntime().addShutdownHook(new Thread(shutdown));
+        } catch (org.apache.commons.cli.ParseException ex) {
+            CliUtils.printHelp(options, syntax);
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/Main.java	Wed Sep 20 23:11:11 2017 +0200
@@ -0,0 +1,51 @@
+package com.passus.st.reporter.server;
+
+import com.passus.st.reporter.protocol.MetricRecord;
+import com.passus.st.reporter.trx.ReporterClient;
+import com.passus.st.reporter.trx.ReporterDispatcher;
+import com.passus.st.reporter.trx.ReporterEncoder;
+import java.io.DataOutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class Main {
+
+    public static void main(String[] args) throws Exception {
+        Map<CharSequence, Object> fields = new HashMap<>();
+        fields.put("url", "wp.pl/xyz");
+        fields.put("ip", "127.0.0.1");
+        fields.put("port", 12345);
+        fields.put("timestamp", 1234567890L);
+        MetricRecord metricRecord = new MetricRecord("httpRequestResponse", fields);
+
+        ReporterEncoder reporterEncoder = new ReporterEncoder();
+        byte[] bytes =  reporterEncoder.encode(metricRecord);
+        int code = ReporterDispatcher.TYPE_METRIC;
+        int length = bytes.length + 4;
+
+        int count = 6;
+        try (Socket s = new Socket("localhost", 400);
+                DataOutputStream os = new DataOutputStream(s.getOutputStream());) {
+
+            for (int i = 0; i < count; i++) {
+                os.writeInt(length);
+                os.writeInt(code);
+                os.write(bytes);
+            }
+        }
+        
+        
+        ReporterClient rc = new ReporterClient(new InetSocketAddress("localhost", 400));
+        rc.start();
+        rc.send(new TestMetric());
+        rc.waitForEmptyQueue();
+        rc.stop();
+    }
+
+}
--- a/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/Test.java	Wed Sep 20 13:29:12 2017 +0200
+++ b/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/Test.java	Wed Sep 20 23:11:11 2017 +0200
@@ -20,12 +20,12 @@
  */
 public class Test {
 
-    private static final Map<CharSequence, Object> FIELDS = map(
+    static final Map<CharSequence, Object> FIELDS = map(
             e("int_fld", 1), e("string_fld", "ok"), e("map_fld", smap("some", "entry")),
             e("bstr_fld", ByteString.create("byte-string-value"))
     );
-    private static final MetricRecord METRIC = new MetricRecord("metric-x", FIELDS);
-    private static final InetSocketAddress ADDRESS = new InetSocketAddress(ServerMain.PORT);
+    static final MetricRecord METRIC = new MetricRecord("metric-x", FIELDS);
+    static final InetSocketAddress ADDRESS = new InetSocketAddress("localhost", 11111);
 
     public static void main(String[] args) throws IOException {
         ReporterImpl reporter = new ReporterImpl();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestCustom.java	Wed Sep 20 23:11:11 2017 +0200
@@ -0,0 +1,36 @@
+package com.passus.st.reporter.server;
+
+import com.passus.commons.metric.Metric;
+import com.passus.st.reporter.trx.ServerMain;
+import com.passus.st.reporter.trx.ReporterClient;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class TestCustom {
+
+    static final TestMetric METRIC = new TestMetric();
+
+    public static void main(String[] args) throws IOException, InterruptedException {
+        ReporterImpl reporter = new ReporterImpl();
+        reporter.setVerbose(true);
+        ServerMain server = new ServerMain("localhost", ServerMain.PORT, 100, reporter);
+        server.start();
+        System.out.println("server started");
+
+        ReporterClient client = new ReporterClient(Test.ADDRESS);
+        client.start();
+        client.send(METRIC);
+        client.waitForEmptyQueue();
+        client.stop();
+
+        server.stop();
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestMetric.java	Wed Sep 20 23:11:11 2017 +0200
@@ -0,0 +1,78 @@
+package com.passus.st.reporter.server;
+
+import com.passus.commons.metric.Metric;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class TestMetric implements Metric {
+
+    private final HashMap<String, Serializable> attributes = new HashMap<>();
+
+    {
+        attributes.put("String", "value");
+        attributes.put("Long", 12345L);
+        attributes.put("Integer", 123);
+        attributes.put("Float", 123.45f);
+    }
+
+    @Override
+    public String getName() {
+        return "test";
+    }
+
+    @Override
+    public boolean isActive() {
+        return true;
+    }
+
+    @Override
+    public void activate() {
+    }
+
+    @Override
+    public void deactivate() {
+    }
+
+    @Override
+    public Class getAttributeClass(String name) {
+        Serializable value = attributes.get(name);
+        return value.getClass();
+    }
+
+    @Override
+    public boolean hasAttribute(String name) {
+        return attributes.containsKey(name);
+    }
+
+    @Override
+    public Set<String> getAttributesName() {
+        return attributes.keySet();
+    }
+
+    @Override
+    public Serializable getAttributeValue(String name) {
+        return attributes.get(name);
+    }
+
+    @Override
+    public Map<String, Serializable> getAttributesValue() {
+        return attributes;
+    }
+
+    @Override
+    public void update(Metric metric) {
+        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    }
+
+    @Override
+    public void reset() {
+        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    }
+
+}