changeset 559:4b26fdc22ef2

reporter - transport - refactoring, configurable threads
author Devel 1
date Thu, 21 Sep 2017 13:49:27 +0200
parents 7c805a77d33d
children f373af509300
files stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroRpcBenchmark.java stress-tester-reporter/src/main/java/com/passus/st/reporter/ReporterClient.java stress-tester-reporter/src/main/java/com/passus/st/reporter/SnmpLogger.java stress-tester-reporter/src/main/java/com/passus/st/reporter/server/AvroRpcReporterClient.java stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ServerMain.java stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterDispatcher.java stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ServerMain.java stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/SocketReporterClient.java stress-tester-reporter/src/main/java/com/passus/st/utils/CliUtils.java stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestCustom.java stress-tester/src/main/java/com/passus/st/Main.java stress-tester/src/main/java/com/passus/st/PcapReporter.java
diffstat 12 files changed, 163 insertions(+), 226 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroRpcBenchmark.java	Thu Sep 21 11:56:09 2017 +0200
+++ b/stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroRpcBenchmark.java	Thu Sep 21 13:49:27 2017 +0200
@@ -1,7 +1,5 @@
 package com.passus.st.avro;
 
-import com.passus.st.reporter.protocol.MetricRecord;
-import com.passus.st.reporter.protocol.MetricsCollectionRecord;
 import com.passus.st.reporter.protocol.Reporter;
 import com.passus.utils.AllocationUtils;
 import java.io.IOException;
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/ReporterClient.java	Thu Sep 21 11:56:09 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/ReporterClient.java	Thu Sep 21 13:49:27 2017 +0200
@@ -1,14 +1,62 @@
 package com.passus.st.reporter;
 
 import com.passus.commons.service.Service;
+import java.net.InetSocketAddress;
+import java.util.concurrent.BlockingQueue;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 /**
  *
  * @author mikolaj.podbielski
  */
-public interface ReporterClient extends Service {
+public abstract class ReporterClient implements Service {
 
-    public boolean send(Object object);
+    protected final Logger logger = LogManager.getLogger(getClass());
+    protected final MetricConverter converter = new MetricConverter();
+    protected final InetSocketAddress serverAddress;
+    protected final BlockingQueue<Object> queue;
+    protected final boolean dropIfQueueFull;
 
-    void waitForEmptyQueue() throws InterruptedException;
+    protected volatile int droppedMessages;
+    protected volatile boolean working;
+
+    public ReporterClient(InetSocketAddress serverAddress, BlockingQueue<Object> queue, boolean dropIfQueueFull) {
+        this.serverAddress = serverAddress;
+        this.queue = queue;
+        this.dropIfQueueFull = dropIfQueueFull;
+    }
+
+    public boolean send(Object object) {
+        if (dropIfQueueFull) {
+            if (!queue.offer(object)) {
+                ++droppedMessages;
+                logger.debug("Could not enqueue message.");
+                return false;
+            }
+        } else {
+            try {
+                queue.put(object);
+            } catch (InterruptedException ignore) {
+                ++droppedMessages;
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public boolean isStarted() {
+        return working;
+    }
+
+    public void waitForEmptyQueue() throws InterruptedException {
+        while (queue.isEmpty() == false) {
+            Thread.sleep(100);
+        }
+    }
+
+    public int getDroppedMessages() {
+        return droppedMessages;
+    }
 }
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/SnmpLogger.java	Thu Sep 21 11:56:09 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/SnmpLogger.java	Thu Sep 21 13:49:27 2017 +0200
@@ -54,6 +54,7 @@
      *
      * @param add must be quasi url, (ex. udp:172.16.60.101/161)
      * @param comm name of the SNMP community
+     * @param period
      */
     public SnmpLogger(final String add, final String comm, final int period) {
         
@@ -71,18 +72,19 @@
         
     }
 
+    @Override
     public void run(){
         try {
             builder = new StringBuilder(200);
             builder.append(System.currentTimeMillis());
             builder.append(";");
-            String cpuLoad1m = this.getAsString(oid_cpuLoad1m);
-            String cpuLoad5m = this.getAsString(oid_cpuLoad5m);
-            String cpuLoad15m = this.getAsString(oid_cpuLoad15m);
-            String ramFree = this.getAsString(oid_ramFree);
-            String swapFree = this.getAsString(oid_swapFree);
-            String ramTotal = this.getAsString(oid_ramTotal);
-            String swapTotal = this.getAsString(oid_swapTotal);
+            String cpuLoad1m = getAsString(oid_cpuLoad1m);
+            String cpuLoad5m = getAsString(oid_cpuLoad5m);
+            String cpuLoad15m = getAsString(oid_cpuLoad15m);
+            String ramFree = getAsString(oid_ramFree);
+            String swapFree = getAsString(oid_swapFree);
+            String ramTotal = getAsString(oid_ramTotal);
+            String swapTotal = getAsString(oid_swapTotal);
             builder.append(cpuLoad1m);
             builder.append(";");
             builder.append(cpuLoad5m);
@@ -133,7 +135,7 @@
      * @throws IOException
      */
     public String getAsString(OID oid) throws IOException {
-        ResponseEvent event = get(new OID[]{oid});
+        ResponseEvent event = get(oid);
         return event.getResponse().get(0).getVariable().toString();
     }
 
@@ -144,7 +146,7 @@
      * @return
      * @throws IOException
      */
-    public ResponseEvent get(OID oids[]) throws IOException {
+    public ResponseEvent get(OID... oids) throws IOException {
         PDU pdu = new PDU();
         for (OID oid : oids) {
             pdu.add(new VariableBinding(oid));
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/AvroRpcReporterClient.java	Thu Sep 21 11:56:09 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/AvroRpcReporterClient.java	Thu Sep 21 13:49:27 2017 +0200
@@ -3,7 +3,6 @@
 import com.passus.commons.metric.Metric;
 import com.passus.commons.metric.MetricsCollection;
 import com.passus.commons.utils.SimpleThreadFactory;
-import com.passus.st.reporter.MetricConverter;
 import com.passus.st.reporter.ReporterClient;
 import com.passus.st.reporter.protocol.MetricRecord;
 import com.passus.st.reporter.protocol.MetricsCollectionRecord;
@@ -11,7 +10,6 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -19,56 +17,31 @@
 import org.apache.avro.AvroRemoteException;
 import org.apache.avro.ipc.NettyTransceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 
 /**
  *
  * @author mikolaj.podbielski
  */
-public class AvroRpcReporterClient implements ReporterClient {
-
-    private static final Logger LOGGER = LogManager.getLogger(AvroRpcReporterClient.class);
+public class AvroRpcReporterClient extends ReporterClient {
 
-    private final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(4096);
-    private final MetricConverter converter = new MetricConverter();
-    private final InetSocketAddress serverAddress;
-    private volatile boolean working;
     private SenderThread sender;
 
     public AvroRpcReporterClient(InetSocketAddress serverAddress) {
-        this.serverAddress = serverAddress;
-    }
-
-    @Override
-    public boolean send(Object object) {
-        if (!queue.offer(object)) {
-            System.out.println("reporter queue full");
-            LOGGER.debug("Could not enqueue message.");
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public boolean isStarted() {
-        return working;
+        super(serverAddress, new ArrayBlockingQueue<>(4096), true);
     }
 
     @Override
     public void start() {
-        if (working) {
-            return;
+        if (!working) {
+            working = true;
+            sender = new SenderThread();
+            try {
+                sender.connect();
+            } catch (IOException ignore) {
+            }
+            sender.start();
         }
-
-        working = true;
-        sender = new SenderThread();
-        try {
-            sender.connect();
-        } catch (IOException ignore) {
-        }
-        sender.start();
     }
 
     @Override
@@ -84,13 +57,6 @@
         }
     }
 
-    @Override
-    public void waitForEmptyQueue() throws InterruptedException {
-        while (queue.isEmpty() == false) {
-            Thread.sleep(100);
-        }
-    }
-
     private static Executor pool(String name, int corePoolSize) {
         SimpleThreadFactory factory = new SimpleThreadFactory("Avro NettyTransceiver" + name);
         return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
@@ -163,22 +129,22 @@
                             System.out.println(metric.getClass());
                             continue;
                         }
-                        LOGGER.trace("result: {}", result);
+                        logger.trace("result: {}", result);
                     }
 
                 } catch (InterruptedException ex) {
-                    LOGGER.trace("Queue.take() was interrupted.");
+                    logger.trace("Queue.take() was interrupted.");
                 } catch (AvroRemoteException ex) {
-                    LOGGER.warn("Could not send.", ex);
+                    logger.warn("Could not send.", ex);
                 } catch (IOException ex) {
-                    LOGGER.warn("Could not connect.", ex);
+                    logger.warn("Could not connect.", ex);
                 } catch (Exception ex) {
-                    LOGGER.error(ex);
+                    logger.error(ex);
                 }
             }
 
             disconnect();
-            LOGGER.debug("Sender stopped.");
+            logger.debug("Sender stopped.");
         }
 
     }
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ServerMain.java	Thu Sep 21 11:56:09 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ServerMain.java	Thu Sep 21 13:49:27 2017 +0200
@@ -3,17 +3,17 @@
 import com.passus.st.reporter.ReporterImpl;
 import com.passus.st.reporter.SnmpLogger;
 import com.passus.st.reporter.protocol.Reporter;
+import com.passus.st.utils.CliUtils;
+import static com.passus.st.utils.CliUtils.option;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import org.apache.avro.ipc.NettyServer;
 import org.apache.avro.ipc.Responder;
 import org.apache.avro.ipc.Server;
 import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.Options;
-import static com.passus.st.utils.CliUtils.option;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.ParseException;
 
 /**
@@ -24,18 +24,21 @@
 
     static final int PORT = 11111;
 
-    static void printHelp(Options options) {
-        HelpFormatter formatter = new HelpFormatter();
-        formatter.printHelp("[options]", "description", options, "");
-    }
-
-    static void printError(String msg) {
-        System.err.print(msg);
-        System.exit(1);
+    static SnmpLogger create(CommandLine cl) throws ParseException {
+        if (cl.hasOption("s")) {
+            // SNMP listener is enabled
+            String snmpAddr = cl.getOptionValue("s");
+            String snmpCommunity = cl.hasOption("co") ? cl.getOptionValue("co") : "passus";
+            int snmpPeriod = cl.hasOption("p") ? Integer.parseUnsignedInt(cl.getOptionValue("p")) : 5;
+            return  new SnmpLogger(snmpAddr, snmpCommunity, snmpPeriod);
+        } else if (cl.hasOption("co") || cl.hasOption("p")) {
+            // Community option without SNMP option specified
+            throw new ParseException("Options <snmpCommunity> and <snmpPeriod> require to specify SNMP Address.");
+        }
+        return null;
     }
 
     public static void main(String[] args) throws IOException {
-
         final Options options = new Options();
 
         options.addOption(option("s", "snmp").desc("Collect SNMP metrics.")
@@ -60,24 +63,11 @@
         SnmpLogger snmp;
         try {
             CommandLine cl = new DefaultParser().parse(options, args);
-            if (cl.hasOption("s")) {
-                // SNMP listener is enabled
-                String snmpAddr = cl.getOptionValue("s");
-                String snmpCommunity = cl.hasOption("co") ? cl.getOptionValue("co") : "passus";
-                int snmpPeriod = cl.hasOption("p") ? Integer.parseUnsignedInt(cl.getOptionValue("p")) : 5;
-                snmp = new SnmpLogger(snmpAddr, snmpCommunity, snmpPeriod);
-            } else if (cl.hasOption("co") || cl.hasOption("p")) {
-                // Community option without SNMP option specified
-                printError("Options <snmpCommunity> and <snmpPeriod> require to specify SNMP Address.");
-                return;
-            } else {
-                snmp = null;
-            }
-
+            snmp = create(cl);
             merge = cl.hasOption("m");
         } catch (ParseException ex) {
             System.out.println(ex.getMessage());
-            printHelp(options);
+            CliUtils.printHelp(options);
             return;
         }
 
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterDispatcher.java	Thu Sep 21 11:56:09 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterDispatcher.java	Thu Sep 21 13:49:27 2017 +0200
@@ -3,6 +3,8 @@
 import com.passus.st.reporter.protocol.MetricRecord;
 import com.passus.st.reporter.protocol.MetricsCollectionRecord;
 import com.passus.st.reporter.protocol.Reporter;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import org.apache.avro.AvroRemoteException;
@@ -15,7 +17,7 @@
  *
  * @author mikolaj.podbielski
  */
-public class ReporterDispatcher {
+public class ReporterDispatcher extends SimpleChannelInboundHandler<byte[]> {
 
     public static final int TYPE_METRIC = 1;
     public static final int TYPE_METRICS_COLLECTION = 2;
@@ -33,9 +35,10 @@
         this.reporter = reporter;
     }
 
-    public void dispatch(byte[] frame) throws AvroRemoteException, IOException {
-        int code = getInt4(frame, 0);
-        ByteArrayInputStream bais = new ByteArrayInputStream(frame, 4, frame.length - 4);
+    @Override
+    public void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws AvroRemoteException, IOException {
+        int code = getInt4(msg, 0);
+        ByteArrayInputStream bais = new ByteArrayInputStream(msg, 4, msg.length - 4);
         BinaryDecoder decoder = decoderFactory.binaryDecoder(bais, null);
 
         switch (code) {
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ServerMain.java	Thu Sep 21 11:56:09 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ServerMain.java	Thu Sep 21 13:49:27 2017 +0200
@@ -1,17 +1,15 @@
 package com.passus.st.reporter.trx;
 
+import com.passus.st.reporter.ReporterImpl;
 import com.passus.st.reporter.protocol.Reporter;
-import com.passus.st.reporter.ReporterImpl;
 import com.passus.st.utils.CliUtils;
 import static com.passus.st.utils.CliUtils.option;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
-import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -20,10 +18,11 @@
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.codec.bytes.ByteArrayDecoder;
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.Options;
-import static com.passus.st.utils.CliUtils.option;
 
 /**
  *
@@ -34,9 +33,7 @@
     public static final int PORT = 11111;
     public static final boolean EPOLL = false;
 
-    private final String bindHost;
-    private final int port;
-    private final int numThreads;
+    private final SocketAddress serverAddress;
     private final int maxFrameSize = 8192;
 
     private final EventLoopGroup bossGroup;
@@ -45,18 +42,16 @@
 
     private final Reporter reporter;
 
-    public ServerMain(String bindHost, int port, int numThreads, Reporter reporter) {
-        this.bindHost = bindHost;
-        this.port = port;
-        this.numThreads = numThreads;
+    public ServerMain(SocketAddress serverAddress, Reporter reporter) {
+        this.serverAddress = serverAddress;
         this.reporter = reporter;
 
         if (EPOLL) {
-            bossGroup = new EpollEventLoopGroup(numThreads);
+            bossGroup = new EpollEventLoopGroup(1);
             workerGroup = new EpollEventLoopGroup();
             socketChannelClass = EpollServerSocketChannel.class;
         } else {
-            bossGroup = new NioEventLoopGroup(numThreads);
+            bossGroup = new NioEventLoopGroup(1);
             workerGroup = new NioEventLoopGroup();
             socketChannelClass = NioServerSocketChannel.class;
         }
@@ -69,10 +64,10 @@
             boot.option(ChannelOption.SO_REUSEADDR, true);
             boot.group(bossGroup, workerGroup)
                     .channel(socketChannelClass)
-                    .childHandler(new TestServerInitializer());
+                    .childHandler(new ServerInitializer());
 
-            Channel ch = boot.bind(bindHost, port).sync().channel();
-            System.err.println("Server started " + bindHost + ":" + port + " epoll: " + EPOLL);
+            Channel ch = boot.bind(serverAddress).sync().channel();
+            System.err.println("Server started " + serverAddress + " epoll: " + EPOLL);
         } catch (Exception ex) {
             ex.printStackTrace();
         }
@@ -84,36 +79,19 @@
         System.err.println("Server stopped.");
     }
 
-    public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
+    public class ServerInitializer extends ChannelInitializer<SocketChannel> {
 
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline pipeline = ch.pipeline();
-
             pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
             pipeline.addLast("bytesDecoder", new ByteArrayDecoder());
-            pipeline.addLast(new ReporterChannelInboundHandler(reporter));
-        }
-    }
-
-    public static class ReporterChannelInboundHandler extends SimpleChannelInboundHandler<byte[]> {
-
-        private final ReporterDispatcher dispatcher;
-
-        public ReporterChannelInboundHandler(Reporter reporter) {
-            dispatcher = new ReporterDispatcher(reporter);
-        }
-
-        @Override
-        public void channelRead0(ChannelHandlerContext ctx, byte[] data) throws Exception {
-            dispatcher.dispatch(data);
+            pipeline.addLast(new ReporterDispatcher(reporter));
         }
     }
 
     public static void main(String[] args) throws IOException {
-        final String syntax = "server <bind address> [-t threads=500]";
         final Options options = new Options();
-        options.addOption(option("t", "threads", "number of worker threads", "threads"));
 
         options.addOption(option("m", "merged").desc("Write file with merged request and response data")
                 .hasArg(false)
@@ -123,17 +101,16 @@
         try {
             CommandLine cl = new DefaultParser().parse(options, args);
             String[] clArgs = cl.getArgs();
-            if (clArgs.length != 1) {
-                CliUtils.printHelp(options, syntax);
+            if (clArgs.length != 0) {
+                CliUtils.printHelp(options);
                 return;
             }
 
-            int threads = Integer.parseInt(cl.getOptionValue('t', "500"));
-
             ReporterImpl reporter = new ReporterImpl(cl.hasOption("m"));
             reporter.setVerbose(false);
 
-            ServerMain server = new ServerMain(clArgs[0], PORT, threads, reporter);
+            InetSocketAddress serverAddress = new InetSocketAddress(PORT);
+            ServerMain server = new ServerMain(serverAddress, reporter);
             server.start();
 
             Runnable shutdown = () -> {
@@ -142,7 +119,7 @@
             };
             Runtime.getRuntime().addShutdownHook(new Thread(shutdown));
         } catch (org.apache.commons.cli.ParseException ex) {
-            CliUtils.printHelp(options, syntax);
+            CliUtils.printHelp(options);
         }
     }
 }
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/SocketReporterClient.java	Thu Sep 21 11:56:09 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/SocketReporterClient.java	Thu Sep 21 13:49:27 2017 +0200
@@ -2,7 +2,6 @@
 
 import com.passus.commons.metric.Metric;
 import com.passus.commons.metric.MetricsCollection;
-import com.passus.st.reporter.MetricConverter;
 import com.passus.st.reporter.ReporterClient;
 import com.passus.st.reporter.protocol.MetricRecord;
 import com.passus.st.reporter.protocol.MetricsCollectionRecord;
@@ -13,76 +12,46 @@
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import org.apache.avro.AvroRemoteException;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 /**
  *
  * @author mikolaj.podbielski
  */
-public class SocketReporterClient implements ReporterClient {
-
-    private static final Logger LOGGER = LogManager.getLogger(SocketReporterClient.class);
+public class SocketReporterClient extends ReporterClient {
 
-    private final MetricConverter converter = new MetricConverter();
-    private final InetSocketAddress serverAddress;
-    private final BlockingQueue<Object> queue;
-    private final boolean dropIfQueueFull;
-    private volatile boolean working;
-//    private SenderThread sender;
-    private SenderThread[] senders = new SenderThread[4];
+    private final SenderThread[] senders;
 
-    public SocketReporterClient(InetSocketAddress serverAddress) {
-        this.serverAddress = serverAddress;
-        this.queue = new ArrayBlockingQueue<>(4096);
-        this.dropIfQueueFull = true;
+    public SocketReporterClient(InetSocketAddress serverAddress, int threads) {
+        this(serverAddress, threads, new ArrayBlockingQueue<>(4096), true);
     }
 
-    public SocketReporterClient(InetSocketAddress serverAddress, BlockingQueue<Object> queue, boolean dropIfQueueFull) {
-        this.serverAddress = serverAddress;
-        this.queue = queue;
-        this.dropIfQueueFull = dropIfQueueFull;
-    }
-
-    @Override
-    public boolean isStarted() {
-        return working;
+    public SocketReporterClient(InetSocketAddress serverAddress, int threads, BlockingQueue<Object> queue, boolean dropIfQueueFull) {
+        super(serverAddress, queue, dropIfQueueFull);
+        if (threads < 1 || threads > 8) {
+            throw new IllegalArgumentException("Allowed between 1 and 8 reporter threads.");
+        }
+        senders = new SenderThread[threads];
     }
 
     @Override
     public void start() {
-        if (working) {
-            return;
+        if (!working) {
+            working = true;
+            for (int i = 0; i < senders.length; i++) {
+                senders[i] = new SenderThread();
+                try {
+                    senders[i].connect();
+                } catch (IOException ignore) {
+                }
+                senders[i].start();
+            }
         }
-
-        working = true;
-//        sender = new SenderThread();
-//        try {
-//            sender.connect();
-//        } catch (IOException ignore) {
-//        }
-//        sender.start();
-        for (int i = 0; i < senders.length; i++) {
-            senders[i] = new SenderThread();
-            try {
-                senders[i].connect();
-            } catch (IOException ignore) {
-            }
-            senders[i].start();
-        }
-
     }
 
     @Override
     public void stop() {
         if (working) {
             working = false;
-//            sender.interrupt();
-//            try {
-//                sender.join(500);
-//            } catch (Exception ex) {
-//            }
-//            sender = null;
             for (int i = 0; i < senders.length; i++) {
                 senders[i].interrupt();
                 try {
@@ -94,31 +63,6 @@
         }
     }
 
-    @Override
-    public void waitForEmptyQueue() throws InterruptedException {
-        while (queue.isEmpty() == false) {
-            Thread.sleep(100);
-        }
-    }
-
-    @Override
-    public boolean send(Object object) {
-        if (dropIfQueueFull) {
-            if (!queue.offer(object)) {
-                System.out.println("reporter queue full");
-                LOGGER.debug("Could not enqueue message.");
-                return false;
-            }
-        } else {
-            try {
-                queue.put(object);
-            } catch (InterruptedException ignore) {
-                return false;
-            }
-        }
-        return true;
-    }
-
     private class SenderThread extends Thread {
 
         private final ReporterEncoder encoder = new ReporterEncoder();
@@ -130,7 +74,7 @@
                 socket = new Socket(serverAddress.getAddress(), serverAddress.getPort());
                 os = new DataOutputStream(socket.getOutputStream());
             } catch (IOException ex) {
-                LOGGER.warn("Could not connect.", ex);
+                logger.warn("Could not connect.", ex);
             }
         }
 
@@ -190,19 +134,19 @@
                         }
                     }
                 } catch (InterruptedException ex) {
-                    LOGGER.trace("Queue.take() was interrupted.");
+                    logger.trace("Queue.take() was interrupted.");
                 } catch (AvroRemoteException ex) {
-                    LOGGER.warn("Could not send.", ex);
+                    logger.warn("Could not send.", ex);
                 } catch (IOException ex) {
-                    LOGGER.warn("Could not connect.", ex);
+                    logger.warn("Could not connect.", ex);
                 } catch (Exception ex) {
-                    LOGGER.error(ex);
+                    logger.error(ex);
                 }
 
             }
 
             disconnect();
-            LOGGER.debug("Sender stopped.");
+            logger.debug("Sender stopped.");
         }
 
     }
--- a/stress-tester-reporter/src/main/java/com/passus/st/utils/CliUtils.java	Thu Sep 21 11:56:09 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/utils/CliUtils.java	Thu Sep 21 13:49:27 2017 +0200
@@ -32,4 +32,9 @@
         HelpFormatter formatter = new HelpFormatter();
         formatter.printHelp(syntax, "---", options, "===");
     }
+
+    public static void printHelp(Options options) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("[options]", "description", options, "");
+    }
 }
--- a/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestCustom.java	Thu Sep 21 11:56:09 2017 +0200
+++ b/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestCustom.java	Thu Sep 21 13:49:27 2017 +0200
@@ -7,6 +7,7 @@
 import com.passus.st.reporter.trx.ServerMain;
 import com.passus.st.reporter.trx.SocketReporterClient;
 import java.io.DataOutputStream;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.HashMap;
 import java.util.Map;
@@ -22,7 +23,7 @@
     public static void main(String[] args) throws Exception {
         ReporterImpl reporter = new ReporterImpl();
         reporter.setVerbose(true);
-        ServerMain server = new ServerMain("localhost", ServerMain.PORT, 100, reporter);
+        ServerMain server = new ServerMain(new InetSocketAddress(ServerMain.PORT), reporter);
         server.start();
         System.out.println("server started");
 
@@ -57,7 +58,7 @@
     }
 
     static void client() throws Exception {
-        SocketReporterClient rc = new SocketReporterClient(Test.ADDRESS);
+        SocketReporterClient rc = new SocketReporterClient(Test.ADDRESS, 1);
         rc.start();
         rc.send(METRIC);
         rc.waitForEmptyQueue();
--- a/stress-tester/src/main/java/com/passus/st/Main.java	Thu Sep 21 11:56:09 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/Main.java	Thu Sep 21 13:49:27 2017 +0200
@@ -157,10 +157,13 @@
                 .hasArg().argName("ip")
                 .build()
         );
-
         options.addOption(option("nrp", "newReporterProto").desc("Enables new reporter protocol.")
                 .hasArg(false)
                 .build());
+        options.addOption(option("nrt", "newReporterThreads").desc("Number of sending threads. (range 1 - 8, default 2)")
+                .hasArg().argName("threads")
+                .build()
+        );
 
         options.addOption(option("wf", "writeFile").desc("Write result to file.")
                 .hasArg().argName("file").optionalArg(true)
@@ -310,7 +313,8 @@
                 InetAddress addr = InetAddress.getByName(cl.getOptionValue("ri"));
                 InetSocketAddress socketAddr = new InetSocketAddress(addr, port);
                 if (cl.hasOption("nrp")) {
-                    reporterClient = new SocketReporterClient(socketAddr);
+                    int threads = Integer.parseInt(cl.getOptionValue("nrt", "2"));
+                    reporterClient = new SocketReporterClient(socketAddr, threads);
                 } else {
                     reporterClient = new AvroRpcReporterClient(socketAddr);
                 }
@@ -418,6 +422,7 @@
             if (reporterClient != null) {
                 reporterClient.waitForEmptyQueue();
                 reporterClient.stop();
+                System.out.println("Dropped reporter messages: " + reporterClient.getDroppedMessages());
             }
             if (summaryListener != null) {
                 summaryListener.close();
--- a/stress-tester/src/main/java/com/passus/st/PcapReporter.java	Thu Sep 21 11:56:09 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/PcapReporter.java	Thu Sep 21 13:49:27 2017 +0200
@@ -43,8 +43,6 @@
 import org.apache.commons.cli.ParseException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import static com.passus.st.Main.printMetrics;
-import static com.passus.st.utils.CliUtils.option;
 
 /**
  *
@@ -93,7 +91,7 @@
             Log4jConfigurationFactory.enableFactory(logLevel);
 
             InetAddress addr = InetAddress.getByName(cl.getOptionValue("ri"));
-            SocketReporterClient reporterClient = new SocketReporterClient(new InetSocketAddress(addr, 11111));
+            SocketReporterClient reporterClient = new SocketReporterClient(new InetSocketAddress(addr, 11111), 2);
             reporterClient.start();
 
             LocalHandler lh = new LocalHandler(reporterClient, cl.hasOption("ps"));