changeset 628:ace9b1e69e96

NioEmitterWorker2
author Devel 2
date Tue, 24 Oct 2017 15:26:49 +0200
parents 6c8ad9bd2767
children 290de0f2a667
files stress-tester/src/main/java/com/passus/st/Main2.java stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpStresserWorker.java stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioDefaultEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker2.java stress-tester/src/test/java/com/passus/st/emitter/nio/NioEmitterWorker2Test.java
diffstat 11 files changed, 2263 insertions(+), 627 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/Main2.java	Tue Oct 24 15:26:49 2017 +0200
@@ -0,0 +1,473 @@
+package com.passus.st;
+
+import com.passus.commons.ConversionException;
+import com.passus.commons.metric.MapMetric;
+import com.passus.commons.metric.Metric;
+import com.passus.commons.service.Registry;
+import com.passus.config.Configuration;
+import com.passus.config.YamlConfigurationReader;
+import com.passus.config.validation.Errors;
+import static com.passus.config.validation.ErrorsUtils.objectErrorToString;
+import com.passus.config.validation.ObjectError;
+import com.passus.net.PortRangeSet;
+import com.passus.st.client.MemoryEventsCache;
+import com.passus.st.client.http.DumperHttpClientListener;
+import com.passus.st.client.http.HttpClient;
+import com.passus.st.client.http.ReporterRemoteDestination;
+import com.passus.st.client.http.HttpSourceNameAwareClientWorkerDispatcher;
+import com.passus.st.client.http.ReporterDestination;
+import com.passus.st.client.http.ReporterFileDestination;
+import com.passus.st.client.http.SummaryHttpClientListener;
+import com.passus.st.client.http.WriterHttpClientListener;
+import com.passus.st.client.http.filter.HttpFiltersConfigurator;
+import com.passus.st.emitter.PassThroughSessionMapper;
+import com.passus.st.emitter.RuleBasedSessionMapper;
+import com.passus.st.emitter.SessionMapper;
+import com.passus.st.emitter.nio.NioEmitter;
+import com.passus.st.emitter.nio.NioEmitterWorker2;
+import com.passus.st.metric.FileMetricsCollectionAppender;
+import com.passus.st.metric.ScheduledMetricsCollector;
+import com.passus.st.metric.SummrizeMetricsCollectionHandler;
+import com.passus.st.reporter.ReporterClient;
+import com.passus.st.reporter.server.AvroRpcReporterClient;
+import com.passus.st.reporter.trx.SocketReporterClient;
+import com.passus.st.source.PcapSessionEventSource;
+import com.passus.st.utils.PeriodFormatter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.IOUtils;
+import static com.passus.st.utils.CliUtils.option;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class Main2 {
+
+    private final SummrizeMetricsCollectionHandler summMetricsHandler = new SummrizeMetricsCollectionHandler();
+    private long startTime;
+
+    static void printHelp(Options options) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("[options] <pcap>", "description", options, "");
+    }
+
+    static void printError(String msg) {
+        System.err.print(msg);
+        System.exit(1);
+    }
+
+    static void printMetrics(List<Metric> metrics, long startTime) {
+        if (startTime == 0) {
+            return;
+        }
+
+        long endTime = System.currentTimeMillis();
+        synchronized (System.out) {
+            System.out.println("");
+            try {
+                System.out.println("Elapsed time: " + PeriodFormatter.INSTANCE.reverseTransform(endTime - startTime) + ".");
+            } catch (ConversionException conversionException) {
+            }
+            System.out.println("Metrics:");
+
+            String line = "%24s: %s\n";
+            for (Metric metric : metrics) {
+                System.out.println(metric.getName() + ":");
+                Map<String, Serializable> valuesMap = metric.getAttributesValue();
+                for (Map.Entry<String, Serializable> entry : valuesMap.entrySet()) {
+                    String key = entry.getKey();
+                    Serializable value = entry.getValue();
+
+                    System.out.printf(line, key, value);
+                }
+            }
+        }
+    }
+
+    void printMetrics() {
+        List<Metric> metrics = summMetricsHandler.getMetrics();
+        printMetrics(metrics, startTime);
+    }
+
+    List<Metric> getMetrics() {
+        return summMetricsHandler.getMetrics();
+    }
+
+    void start(String... args) {
+        AppUtils.registerAll();
+        final Options options = new Options();
+
+        options.addOption(option("l", "logLevel").desc("Log level.")
+                .hasArg().argName("level")
+                .build()
+        );
+
+        options.addOption(option("ff", "filtersFile").desc("Filters file.")
+                .hasArg().argName("file")
+                .build()
+        );
+
+        options.addOption(option("mr", "mapperRule").desc("Session mapper rule.")
+                .hasArg().argName("rule")
+                .build()
+        );
+
+        options.addOption(option("ps", "allowPartialSession").desc("Allow partial sessions.")
+                .hasArg(false)
+                .build()
+        );
+
+        options.addOption(option("hp", "httpPorts").desc("Specify HTTP ports in input file (default: 80, 8080)")
+                .hasArg().argName("ports")
+                .build()
+        );
+
+        options.addOption(option("rs", "replaySpeed").desc("Speedup factor (default 0 - top speed)")
+                .hasArg().argName("speed")
+                .build());
+
+        options.addOption(option("pr", "parallelReplays").desc("Number of parallel replays. Works only for one pcap file.")
+                .hasArg().argName("replays")
+                .build());
+
+        options.addOption(option("ca", "cache").desc("Cache (and preprocess) input file.")
+                .hasArg(false)
+                .build());
+
+        options.addOption(option("lp", "loops").desc("Loops number (default 1).")
+                .hasArg().argName("loop")
+                .build()
+        );
+
+        options.addOption(option("wt", "workerType").desc("Worker type: synch|asynch|parallel (default synch).")
+                .hasArg().argName("type")
+                .build()
+        );
+
+        options.addOption(option("rd", "reporterDirectory").desc("Reporter directory.")
+                .hasArg().argName("ip")
+                .build()
+        );
+        options.addOption(option("ri", "reporterIp").desc("Reporter ip address.")
+                .hasArg().argName("ip")
+                .build()
+        );
+        options.addOption(option("nrp", "newReporterProto").desc("Enables new reporter protocol.")
+                .hasArg(false)
+                .build());
+        options.addOption(option("nrt", "newReporterThreads").desc("Number of sending threads. (range 1 - 8, default 2)")
+                .hasArg().argName("threads")
+                .build()
+        );
+
+        options.addOption(option("wf", "writeFile").desc("Write result to file.")
+                .hasArg().argName("file").optionalArg(true)
+                .build()
+        );
+
+        options.addOption(option("wd", "writeDirectory").desc("Write HTTP messages to separate files.")
+                .hasArg().argName("directory")
+                .build()
+        );
+
+        options.addOption(option("wdro", "writeDirectoryRequestsOnly").desc("Write only HTTP requests.")
+                .hasArg(false)
+                .build()
+        );
+
+        options.addOption(option("wm", "writeMetrics").desc("Write metrics to file.")
+                .hasArg().argName("file")
+                .build()
+        );
+
+        try {
+            CommandLine cl = new DefaultParser().parse(options, args);
+            String[] clArgs = cl.getArgs();
+            if (clArgs.length < 1) {
+                System.err.println("At least one pcap file required.");
+                printHelp(options);
+                return;
+            }
+
+            String logLevel = "error";
+            if (cl.hasOption("l")) {
+                logLevel = cl.getOptionValue("l");
+            }
+            if (!logLevel.equals("default")) {
+                Log4jConfigurationFactory.enableFactory(logLevel);
+            }
+
+            SessionMapper mapper;
+            if (cl.hasOption("mr")) {
+                RuleBasedSessionMapper constMapper = new RuleBasedSessionMapper();
+                String[] rules = cl.getOptionValues("mr");
+                for (String rule : rules) {
+                    try {
+                        constMapper.addRule(rule);
+                    } catch (Exception e) {
+                        printError("Invalid mapper rule '" + rule + "'.");
+                    }
+                }
+
+                mapper = constMapper;
+            } else {
+                mapper = new PassThroughSessionMapper();
+            }
+
+            NioEmitter emitter = new NioEmitter(NioEmitterWorker2.class);
+            emitter.setSessionMapper(mapper);
+            emitter.setCollectMetrics(true);
+
+            HttpClient client = new HttpClient(emitter);
+            client.setCollectMetrics(true);
+            client.setConnectPartialSession(cl.hasOption("ps"));
+            client.setWokerType(cl.getOptionValue("wt", "synch"));
+            client.addListener((request, response, context) -> {
+                if (startTime == 0) {
+                    startTime = System.currentTimeMillis();
+                }
+            });
+            if (cl.hasOption("pr")) {
+                if (clArgs.length != 1) {
+                    throw new IllegalArgumentException("Parameter \"parallelReplays\" works only for one pcap file.");
+                }
+
+                int parallelReplays = Integer.parseInt(cl.getOptionValue("pr"));
+                if (parallelReplays > 0 && parallelReplays <= 100) {
+                    emitter.setMaxThreads(parallelReplays);
+                    client.setWorkersNum(parallelReplays);
+                } else {
+                    throw new IllegalArgumentException("Parameter \"parallelReplays\" should be in range 1-100.");
+                }
+            } else {
+                client.setWorkersNum(clArgs.length);
+                client.setDispatcher(new HttpSourceNameAwareClientWorkerDispatcher());
+            }
+            emitter.start();
+
+            if (cl.hasOption("rs")) {
+                float speed = Float.parseFloat(cl.getOptionValue("rs"));
+                if (speed > .1f && speed < 100f) {
+                    client.setSleepFactor(1f / speed);
+                } else if (speed != 0f) {
+                    throw new IllegalArgumentException();
+                }
+            }
+
+            if (cl.hasOption("wf")) {
+                WriterHttpClientListener writerListener;
+                String value = cl.getOptionValue("wf");
+                if (value != null && !value.equals("-")) {
+                    File outFile = new File(cl.getOptionValue("wf"));
+                    writerListener = WriterHttpClientListener.createFileListener(outFile);
+                } else {
+                    writerListener = WriterHttpClientListener.createStdoutListener();
+                }
+                writerListener.write(Arrays.asList(args).toString());
+                writerListener.write("\n");
+                client.addListener(writerListener);
+            }
+
+            SummaryHttpClientListener summaryListener = null;
+            if (cl.hasOption("wd")) {
+                File outDir = new File(cl.getOptionValue("wd"));
+                DumperHttpClientListener dumper = new DumperHttpClientListener(outDir);
+                dumper.setRequestsOnly(cl.hasOption("wdro"));
+                dumper.setDecodeContent(true);
+                File directory = dumper.getDirectory();
+                if (directory.isDirectory() || directory.mkdirs()) {
+                    client.addListener(dumper);
+                    summaryListener = new SummaryHttpClientListener(new File(directory, "summary.txt"));
+                    File file = new File(directory, "cmd." + new File(clArgs[0]).getName() + ".txt");
+                    String cmd = String.join(" ", args);
+                    try (FileOutputStream fos = new FileOutputStream(file);) {
+                        IOUtils.write(cmd, fos);
+                    }
+                    client.addListener(summaryListener);
+                } else {
+                    throw new Exception("Cannot create directory: " + directory.getAbsolutePath());
+                }
+            }
+
+            ScheduledMetricsCollector collector = new ScheduledMetricsCollector();
+
+            ReporterClient reporterClient = null;
+            if (cl.hasOption("ri")) {
+                int port = 11111;
+                InetAddress addr = InetAddress.getByName(cl.getOptionValue("ri"));
+                InetSocketAddress socketAddr = new InetSocketAddress(addr, port);
+                if (cl.hasOption("nrp")) {
+                    int threads = Integer.parseInt(cl.getOptionValue("nrt", "2"));
+                    reporterClient = new SocketReporterClient(socketAddr, threads);
+                } else {
+                    reporterClient = new AvroRpcReporterClient(socketAddr);
+                }
+                reporterClient.start();
+
+                ReporterDestination reporterDestination = new ReporterRemoteDestination(reporterClient);
+                Registry.getInstance().add(ReporterDestination.SERVICE_NAME, reporterDestination);
+                client.addListener(reporterDestination);
+                collector.addHandler(reporterDestination);
+            } else if (cl.hasOption("rd")) {
+                // TODO: refactor
+                ReporterDestination reporterDestination = new ReporterFileDestination(cl.getOptionValue("rd"));
+                Registry.getInstance().add(ReporterDestination.SERVICE_NAME, reporterDestination);
+                client.addListener(reporterDestination);
+                collector.addHandler(reporterDestination);
+            }
+
+            if (cl.hasOption("ff")) {
+                File filtersFile = new File(cl.getOptionValue("ff"));
+                Configuration config = YamlConfigurationReader.readFromFile(filtersFile);
+                HttpFiltersConfigurator configurator = new HttpFiltersConfigurator(client);
+                Errors errors = new Errors();
+                configurator.configure(config, errors);
+                if (errors.getErrorCount() != 0) {
+                    System.out.println("Error in file '" + filtersFile.getAbsolutePath() + "'.");
+                    for (ObjectError error : errors.getAllErrors()) {
+                        System.out.println("\t" + objectErrorToString(error));
+                    }
+                    System.exit(1);
+                }
+            }
+
+            client.start();
+
+            PcapSessionEventSource[] eventSrcs = new PcapSessionEventSource[clArgs.length];
+
+            for (int i = 0; i < clArgs.length; i++) {
+                PcapSessionEventSource eventSrc = new PcapSessionEventSource();
+                eventSrc.setPcapFile(clArgs[i]);
+                eventSrc.setAllowPartialSession(cl.hasOption("ps"));
+                eventSrc.setCollectMetrics(true);
+                eventSrcs[i] = eventSrc;
+            }
+
+            if (cl.hasOption("hp")) {
+                for (int i = 0; i < clArgs.length; i++) {
+                    PcapSessionEventSource eventSrc = eventSrcs[i];
+                    PortRangeSet portsRanges = eventSrc.getPortsRange();
+                    portsRanges.clear();
+
+                    String[] ports = cl.getOptionValues("hp");
+                    for (String port : ports) {
+                        portsRanges.add(port);
+                    }
+                }
+            }
+
+            int loops = Integer.parseInt(cl.getOptionValue("lp", "1"));
+            if (loops <= 0) {
+                throw new Exception("Loop should be greater than zero.");
+            }
+
+            if (cl.hasOption("wm")) {
+                File metricsFile = new File(cl.getOptionValue("wm"));
+                if (!metricsFile.exists() && !metricsFile.createNewFile()) {
+                    printError("Unable to create metrics file '" + metricsFile + "'.");
+                }
+
+                FileMetricsCollectionAppender fileAppender = new FileMetricsCollectionAppender();
+                fileAppender.setFileName(metricsFile.getAbsolutePath());
+                fileAppender.start();
+
+                collector.addHandler(fileAppender);
+            }
+
+            collector.addHandler(summMetricsHandler);
+            for (int i = 0; i < clArgs.length; i++) {
+                collector.register(eventSrcs[i]);
+            }
+
+            collector.register(emitter);
+            collector.register(client);
+            collector.start();
+
+            if (cl.hasOption("ca")) {
+                Set<String> sourcesName = new HashSet<>();
+                for (int i = 0; i < clArgs.length; i++) {
+                    PcapSessionEventSource eventSrc = eventSrcs[i];
+                    sourcesName.add(eventSrc.getName());
+                }
+
+                MemoryEventsCache cache = new MemoryEventsCache(client, sourcesName);
+                for (int i = 0; i < clArgs.length; i++) {
+                    PcapSessionEventSource eventSrc = eventSrcs[i];
+                    eventSrc.setHandler(cache);
+                    eventSrc.start();
+                }
+                cache.setLoop(loops);
+                cache.await();
+                cache.send();
+            } else {
+                for (int i = 0; i < clArgs.length; i++) {
+                    PcapSessionEventSource eventSrc = eventSrcs[i];
+                    eventSrc.setHandler(client);
+                    eventSrc.setLoops(loops);
+                    eventSrc.start();
+                }
+            }
+
+            ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+            executor.scheduleAtFixedRate(this::printMetrics, 5, 5, TimeUnit.SECONDS);
+
+            client.join();
+
+            long endTime = System.currentTimeMillis();
+            System.out.println("Broadcast finished. Duration " + PeriodFormatter.INSTANCE.reverseTransform(endTime - startTime) + ".");
+
+            collector.collect();
+
+            executor.shutdownNow();
+            for (int i = 0; i < clArgs.length; i++) {
+                eventSrcs[i].stop();
+            }
+
+            client.stop();
+            emitter.stop();
+            if (reporterClient != null) {
+                reporterClient.send(new MapMetric("endOfReplay", Collections.emptyMap()));
+                reporterClient.waitForEmptyQueue();
+                reporterClient.stop();
+                System.out.println("Dropped reporter messages: " + reporterClient.getDroppedMessages());
+            }
+            if (summaryListener != null) {
+                summaryListener.close();
+            }
+
+            collector.flush(true);
+
+            printMetrics();
+        } catch (ParseException e) {
+            System.out.println(e.getMessage());
+            printHelp(options);
+        } catch (Exception e) {
+            e.printStackTrace(System.err);
+        } finally {
+            AppUtils.unregisterAll();
+        }
+    }
+
+    public static void main(String... args) {
+        new Main2().start(args);
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java	Tue Oct 17 13:10:50 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java	Tue Oct 24 15:26:49 2017 +0200
@@ -28,7 +28,7 @@
 
     public static final String TYPE = "parallel";
 
-    public static final int DEFAULT_MAX_SENT_REQUESTS = 1024;
+    public static final int DEFAULT_MAX_SENT_REQUESTS = 10;
 
     private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>();
 
@@ -193,6 +193,32 @@
         makeFirst(flowContext);
     }
 
+    @Override
+    public void handle(Event event) {
+        Event newEvent = null;
+        switch (event.getType()) {
+            case HttpSessionPayloadEvent.TYPE:
+                semaphore.acquireUninterruptibly();
+                newEvent = eventInstanceForWorker(event);
+                break;
+            case SessionStatusEvent.TYPE:
+            case DataLoopEnd.TYPE:
+            case DataEnd.TYPE:
+                newEvent = event;
+        }
+
+        if (newEvent != null) {
+            synchronized (lock) {
+                try {
+                    eventsQueue.put(newEvent);
+                    lock.notifyAll();
+                } catch (Exception e) {
+                    logger.debug("Unable to add event to queue. " + e.getMessage(), e);
+                }
+            }
+        }
+    }
+
     private void processEvent(Event event) {
         if (event instanceof SessionEvent) {
             switch (event.getType()) {
@@ -258,32 +284,6 @@
     }
 
     @Override
-    public void handle(Event event) {
-        Event newEvent = null;
-        switch (event.getType()) {
-            case HttpSessionPayloadEvent.TYPE:
-                semaphore.acquireUninterruptibly();
-                newEvent = eventInstanceForWorker(event);
-                break;
-            case SessionStatusEvent.TYPE:
-            case DataLoopEnd.TYPE:
-            case DataEnd.TYPE:
-                newEvent = event;
-        }
-
-        if (newEvent != null) {
-            synchronized (lock) {
-                try {
-                    eventsQueue.put(newEvent);
-                    lock.notifyAll();
-                } catch (Exception e) {
-                    logger.debug("Unable to add event to queue. " + e.getMessage(), e);
-                }
-            }
-        }
-    }
-
-    @Override
     public void run() {
         synchronized (lock) {
             working = true;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpStresserWorker.java	Tue Oct 24 15:26:49 2017 +0200
@@ -0,0 +1,73 @@
+package com.passus.st.client.http;
+
+import com.passus.net.http.HttpHeaders;
+import com.passus.net.http.HttpRequest;
+import com.passus.st.client.DataEvents;
+import com.passus.st.client.Event;
+import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.SessionInfo;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class HttpStresserWorker extends HttpClientWorker {
+
+    private int maxRequests = -1;
+
+    private int sentRequests = 0;
+
+    private boolean working = true;
+
+    public HttpStresserWorker(Emitter emitter, String name, int index) {
+        super(emitter, name, index);
+    }
+
+    @Override
+    public boolean isWorking() {
+        return working;
+    }
+
+    @Override
+    public int activeConnections() {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    private void send(HttpRequest req) {
+        req.getHeaders().delete(HttpHeaders.KEEP_ALIVE);
+
+    }
+
+    @Override
+    public void close(SessionInfo session) {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    private void waitAndClose() {
+        working = false;
+    }
+
+    @Override
+    public void handle(Event event) {
+        if (event.getType() == DataEvents.DataLoopEnd.TYPE) {
+            waitAndClose();
+        } else if (event.getType() != HttpSessionPayloadEvent.TYPE) {
+            return;
+        }
+
+        HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event;
+        HttpRequest req = payloadEvent.getRequest();
+        send(req);
+    }
+
+    @Override
+    public void run() {
+        
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Tue Oct 17 13:10:50 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Tue Oct 24 15:26:49 2017 +0200
@@ -4,7 +4,6 @@
 import com.passus.net.IpAddress;
 import com.passus.net.SocketAddress;
 import com.passus.net.session.SessionBean;
-import com.passus.net.session.SessionUtils;
 import java.text.ParseException;
 import java.util.Objects;
 import org.apache.commons.lang3.StringUtils;
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java	Tue Oct 17 13:10:50 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java	Tue Oct 24 15:26:49 2017 +0200
@@ -18,7 +18,7 @@
  */
 public class NioChannelContext implements ChannelContext {
 
-    private final NioEmitterWorker worker;
+    private final NioDefaultEmitterWorker worker;
 
     private final SessionInfo sessionInfo;
 
@@ -32,7 +32,7 @@
 
     private SelectionKey key;
 
-    public NioChannelContext(NioEmitterWorker worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) {
+    public NioChannelContext(NioDefaultEmitterWorker worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) {
         this.worker = worker;
         this.channel = channel;
         this.remoteAddress = remoteAddress;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java	Tue Oct 24 15:26:49 2017 +0200
@@ -0,0 +1,113 @@
+package com.passus.st.emitter.nio;
+
+import com.passus.data.ByteBuff;
+import com.passus.net.SocketAddress;
+import com.passus.net.utils.AddressUtils;
+import com.passus.st.emitter.ChannelContext;
+import com.passus.st.emitter.SessionInfo;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class NioChannelContext2 implements ChannelContext {
+
+    private final NioEmitterWorker2 worker;
+
+    private final SessionInfo sessionInfo;
+
+    private final SocketChannel channel;
+
+    private final Queue<ByteBuffer> dataQueue;
+
+    private SocketAddress localAddress;
+
+    private SocketAddress remoteAddress;
+
+    private SelectionKey key;
+
+    /**
+     * Usunac
+     */
+    public long regTime;
+    
+    public NioChannelContext2(NioEmitterWorker2 worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) {
+        this.worker = worker;
+        this.channel = channel;
+        this.remoteAddress = remoteAddress;
+        this.sessionInfo = sessionInfo;
+        this.dataQueue = new LinkedList<>();
+
+    }
+
+    Queue<ByteBuffer> dataQueue() {
+        return dataQueue;
+    }
+
+    void selectionKey(SelectionKey key) {
+        this.key = key;
+    }
+
+    private void addToQeueu(ByteBuffer buffer) throws IOException {
+        dataQueue.add(buffer);
+    }
+
+    @Override
+    public boolean isConnected() {
+        return channel.isConnected();
+    }
+
+    @Override
+    public boolean isConnectionPending() {
+        return channel.isConnectionPending();
+    }
+
+    @Override
+    public void write(byte[] data, int offset, int length) throws IOException {
+        addToQeueu(ByteBuffer.wrap(data, offset, length));
+    }
+
+    @Override
+    public void write(ByteBuff data) throws IOException {
+        addToQeueu(data.toNioByteBuffer());
+    }
+
+    @Override
+    public void flush() {
+        worker.flush(key);
+    }
+
+    @Override
+    public void close() throws IOException {
+        worker.doClose(key);
+    }
+
+    @Override
+    public SocketAddress getLocalAddress() {
+        try {
+            if (localAddress == null && channel.getLocalAddress() != null) {
+                localAddress = AddressUtils.jdkSocketToSocketAddress(channel.getLocalAddress());
+            }
+        } catch (Exception e) {
+        }
+
+        return localAddress;
+    }
+
+    @Override
+    public SocketAddress getRemoteAddress() {
+        return remoteAddress;
+    }
+
+    @Override
+    public SessionInfo getSessionInfo() {
+        return sessionInfo;
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDefaultEmitterWorker.java	Tue Oct 24 15:26:49 2017 +0200
@@ -0,0 +1,518 @@
+package com.passus.st.emitter.nio;
+
+import com.passus.data.ByteBuff;
+import com.passus.data.HeapByteBuff;
+import com.passus.net.SocketAddress;
+import com.passus.net.utils.AddressUtils;
+import com.passus.st.emitter.EmitterHandler;
+import static com.passus.st.emitter.SessionMapper.ANY_SOCKET;
+import com.passus.st.emitter.SessionMapper.ConnectionParams;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.metric.MetricsContainer;
+import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class NioDefaultEmitterWorker extends NioEmitterWorker {
+
+    private final Selector selector;
+
+    private volatile boolean working = false;
+
+    private final Queue<Task> tasks = new ConcurrentLinkedQueue<>();
+
+    public NioDefaultEmitterWorker(int index) throws IOException {
+        super(index);
+        selector = Selector.open();
+    }
+
+    @Override
+    public void setWorking(boolean working) {
+        this.working = working;
+        if (this.working && !working) {
+            try {
+                selector.close();
+            } catch (IOException ex) {
+                logger.warn(ex.getMessage(), ex);
+            }
+        }
+    }
+
+    public boolean isWorking() {
+        return working;
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+        if (collectMetrics) {
+            container.update(System.currentTimeMillis(), metric);
+            metric.reset();
+        }
+    }
+
+    @Override
+    public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException {
+        tasks.add(new ConnectTask(sessionInfo, handler));
+        selector.wakeup();
+    }
+
+    void flush(SelectionKey key) {
+        tasks.add(new FlushTask(key));
+        selector.wakeup();
+    }
+
+    private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) {
+        try {
+            ConnectionParams connParams = sessionMapper.map(sessionInfo);
+            if (connParams == null) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Unable to map session '{}'.", sessionInfo);
+                }
+
+                if (collectMetrics) {
+                    synchronized (metric) {
+                        metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID);
+                    }
+                }
+
+                try {
+                    handler.sessionInvalidated(sessionInfo);
+                } catch (Exception e) {
+                    logger.debug(e.getMessage(), e);
+                }
+
+                return;
+            }
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
+            }
+
+            SocketChannel channel = SocketChannel.open();
+            channel.configureBlocking(false);
+
+            SocketAddress bindAddress = connParams.getBindAddress();
+            if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
+                channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress));
+            }
+
+            SocketAddress remoteAddress = connParams.getRemoteAddress();
+            if (remoteAddress == null) {
+                remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
+            }
+
+            NioChannelContext channelContext = new NioChannelContext(this, channel, remoteAddress, sessionInfo);
+            KeyContext keyContext = new KeyContext(channelContext, handler);
+            SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext);
+            try {
+                handler.channelRegistered(channelContext);
+            } catch (Exception ex) {
+                doCatchException(key, ex);
+            }
+
+            channelContext.selectionKey(key);
+            try {
+                channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress));
+            } catch (Exception ex) {
+                doCatchException(key, ex);
+                return;
+            }
+
+            selector.wakeup();
+        } catch (Exception e) {
+            if (collectMetrics) {
+                metric.errorCaught(e);
+            }
+            logger.error(e.getMessage(), e);
+        }
+
+    }
+
+    private void doFinishConnect(SelectionKey key) {
+        SocketChannel channel = (SocketChannel) key.channel();
+        KeyContext keyContext = (KeyContext) key.attachment();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Connecting to '" + keyContext.channelContext.getRemoteAddress() + "'.");
+        }
+
+        try {
+            long connStart = System.currentTimeMillis();
+            boolean timeouted = false;
+            while (!channel.finishConnect()) {
+                long now = System.currentTimeMillis();
+                if (now - connStart < connectionTimeout) {
+                    timeouted = true;
+                    break;
+                }
+            }
+            key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
+
+            if (timeouted) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress());
+                }
+
+                throw new ConnectException("Connection timed out.");
+
+            }
+        } catch (Exception e) {
+            doCatchException(key, e);
+            key.cancel();
+            if (collectMetrics) {
+                metric.incConnectionsErrors();
+            }
+
+            return;
+        }
+
+        try {
+            if (collectMetrics) {
+                metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress());
+                metric.addBindSocket(keyContext.channelContext.getLocalAddress());
+            }
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress());
+            }
+
+            keyContext.handler.channelActive(keyContext.channelContext);
+            setOpRead(key);
+        } catch (Exception ex) {
+            logger.error(ex.getMessage(), ex);
+        }
+    }
+
+    private void doWrite(SelectionKey key) {
+        SocketChannel socketChannel = (SocketChannel) key.channel();
+        KeyContext keyContext = (KeyContext) key.attachment();
+        if (logger.isDebugEnabled()) {
+            logger.debug("Writing ({} -> {}).",
+                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
+        }
+
+        keyContext.handler.dataWriteStart(keyContext.channelContext);
+        Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue();
+        int written = 0;
+        try {
+            ByteBuffer buffer;
+            while (!queue.isEmpty()) {
+                buffer = queue.poll();
+                while (buffer.hasRemaining()) {
+                    int res = socketChannel.write(buffer);
+
+                    if (res == -1) {
+                        doClose(key);
+                        return;
+                    }
+
+                    if (collectMetrics) {
+                        metric.updateSentBytes(res);
+                    }
+
+                    written += res;
+                }
+            }
+        } catch (Exception e) {
+            doCatchException(key, e);
+            doClose(key);
+            return;
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Written {}B ({} -> {})", written,
+                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
+        }
+
+        //TODO Operacje na handlerach powinny przechodzic przez Executor
+        try {
+            keyContext.handler.dataWritten(keyContext.channelContext);
+            logger.debug("Write handled.");
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+
+        setOpRead(key);
+        clearOpWrite(key);
+    }
+
+    private void doRead(SelectionKey key) {
+        SocketChannel channel = (SocketChannel) key.channel();
+        KeyContext keyContext = (KeyContext) key.attachment();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Reading ({} -> {})",
+                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
+        }
+
+        ByteBuffer buffer = keyContext.buffer;
+
+        buffer.clear();
+
+        ByteBuff buff = new HeapByteBuff();
+        int totalReaded = 0;
+        int readed;
+        try {
+            while ((readed = channel.read(buffer)) > 0) {
+                buffer.flip();
+                buff.append(buffer.array(), buffer.position(), buffer.limit());
+                buffer.clear();
+                totalReaded += readed;
+
+                if (collectMetrics) {
+                    metric.updateReceivedBytes(readed);
+                }
+            }
+        } catch (IOException e) {
+            doCatchException(key, e);
+            doClose(key);
+            return;
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Readed {}B ({} -> {})", totalReaded,
+                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
+        }
+
+        if (totalReaded > 0) {
+            try {
+                keyContext.handler.dataReceived(keyContext.channelContext, buff);
+                logger.debug("Read handled.");
+            } catch (Exception e) {
+                logger.debug(e.getMessage(), e);
+            }
+        }
+
+        if (readed == -1) {
+            doClose(key);
+            return;
+        }
+
+        keyContext.buffer.flip();
+    }
+
+    private void doCatchException(SelectionKey key, Throwable cause) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Error occured. " + cause.getMessage(), cause);
+        }
+
+        if (collectMetrics) {
+            synchronized (metric) {
+                metric.errorCaught(cause);
+            }
+        }
+
+        KeyContext keyContext = (KeyContext) key.attachment();
+
+        try {
+            keyContext.handler.errorOccured(keyContext.channelContext, cause);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+    }
+
+    private void doClose(SelectionKey key) {
+        if (!key.channel().isOpen()) {
+            selector.wakeup();
+            return;
+        }
+
+        try {
+            key.channel().close();
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+
+        KeyContext keyContext = (KeyContext) key.attachment();
+        if (logger.isDebugEnabled()) {
+            logger.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'.");
+        }
+
+        try {
+            keyContext.handler.channelInactive(keyContext.channelContext);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+
+        key.cancel();
+        try {
+            keyContext.handler.channelUnregistered(keyContext.channelContext);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'.");
+        }
+
+        if (collectMetrics) {
+            metric.incClosedConnections();
+        }
+
+        selector.wakeup();
+    }
+
+    static void setOpRead(SelectionKey key) {
+        if (!key.isValid() || key.isReadable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+        key.selector().wakeup();
+    }
+
+    static void clearOpRead(SelectionKey key) {
+        if (!key.isValid() || !key.isReadable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
+        key.selector().wakeup();
+    }
+
+    static void setOpWrite(SelectionKey key) {
+        if (!key.isValid() || key.isWritable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+        key.selector().wakeup();
+    }
+
+    static void clearOpWrite(SelectionKey key) {
+        if (!key.isValid() || !key.isWritable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+        key.selector().wakeup();
+    }
+
+    void requestClose(SelectionKey key) {
+        tasks.add(new CloseTask(key));
+        key.selector().wakeup();
+    }
+
+    @Override
+    public void run() {
+        int selected = 0;
+        working = true;
+        while (working) {
+            if (!tasks.isEmpty()) {
+                Task task;
+                while ((task = tasks.poll()) != null) {
+                    if (task.code == Task.CLOSE) {
+                        doClose(((CloseTask) task).key);
+                    } else if (task.code == Task.CONNECT) {
+                        ConnectTask taskConn = (ConnectTask) task;
+                        doConnect(taskConn.sessionInfo, taskConn.handler);
+                    } else if (task.code == Task.FLUSH) {
+                        FlushTask flushTask = (FlushTask) task;
+                        setOpWrite(flushTask.key);
+                    }
+                }
+            }
+
+            try {
+                selected = selector.select(selectTimeout);
+            } catch (IOException ex) {
+                logger.warn(ex.getMessage(), ex);
+            }
+
+            if (selected > 0) {
+                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
+                while (it.hasNext()) {
+                    SelectionKey key = it.next();
+                    it.remove();
+
+                    if (!key.isValid()) {
+                        continue;
+                    }
+
+                    if (key.isConnectable()) {
+                        doFinishConnect(key);
+                    } else if (key.isWritable()) {
+                        doWrite(key);
+                    } else if (key.isReadable()) {
+                        doRead(key);
+                    }
+                }
+            }
+        }
+    }
+
+    private static class KeyContext {
+
+        private final EmitterHandler handler;
+
+        private final NioChannelContext channelContext;
+
+        private ByteBuffer buffer = ByteBuffer.allocate(1024);
+
+        public KeyContext(NioChannelContext channelContext, EmitterHandler handler) {
+            this.channelContext = channelContext;
+            this.handler = handler;
+        }
+
+    }
+
+    private static abstract class Task {
+
+        public static final int CLOSE = 1;
+        public static final int CONNECT = 2;
+        public static final int FLUSH = 3;
+
+        private final int code;
+
+        public Task(int code) {
+            this.code = code;
+        }
+
+    }
+
+    private final static class ConnectTask extends Task {
+
+        private final SessionInfo sessionInfo;
+        private final EmitterHandler handler;
+
+        public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) {
+            super(CONNECT);
+            this.sessionInfo = sessionInfo;
+            this.handler = handler;
+        }
+
+    }
+
+    private final static class CloseTask extends Task {
+
+        private final SelectionKey key;
+
+        public CloseTask(SelectionKey key) {
+            super(CLOSE);
+            this.key = key;
+        }
+
+    }
+
+    private final static class FlushTask extends Task {
+
+        private final SelectionKey key;
+
+        public FlushTask(SelectionKey key) {
+            super(FLUSH);
+            this.key = key;
+        }
+
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java	Tue Oct 17 13:10:50 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java	Tue Oct 24 15:26:49 2017 +0200
@@ -35,6 +35,17 @@
 
     private long connectionTimeout = 5_000;
 
+    private final Class<? extends NioEmitterWorker> workerClass;
+
+    public NioEmitter() {
+        this(NioDefaultEmitterWorker.class);
+    }
+
+    public NioEmitter(Class<? extends NioEmitterWorker> workerClass) {
+        Assert.notNull(workerClass, "workerClass");
+        this.workerClass = workerClass;
+    }
+
     public int getMaxThreads() {
         return maxThreads;
     }
@@ -123,7 +134,7 @@
         workers = new NioEmitterWorker[maxThreads];
         for (int i = 0; i < maxThreads; i++) {
             try {
-                NioEmitterWorker worker = new NioEmitterWorker(i);
+                NioEmitterWorker worker = workerClass.getConstructor(Integer.TYPE).newInstance(i);
                 worker.setSessionMapper(sessionMapper);
                 worker.setCollectMetrics(collectMetrics);
                 worker.setConnectionTimeout(connectionTimeout);
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Tue Oct 17 13:10:50 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Tue Oct 24 15:26:49 2017 +0200
@@ -1,596 +1,102 @@
-package com.passus.st.emitter.nio;
-
-import com.passus.st.emitter.SessionMapper;
-import com.passus.commons.Assert;
-import com.passus.data.ByteBuff;
-import com.passus.data.HeapByteBuff;
-import com.passus.net.SocketAddress;
-import com.passus.net.utils.AddressUtils;
-import com.passus.st.emitter.EmitterHandler;
-import com.passus.st.emitter.EmitterMetric;
-import static com.passus.st.emitter.SessionMapper.ANY_SOCKET;
-import com.passus.st.emitter.SessionMapper.ConnectionParams;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.metric.MetricSource;
-import com.passus.st.metric.MetricsContainer;
-import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- *
- * @author Mirosław Hawrot
- */
-public class NioEmitterWorker extends Thread implements MetricSource {
-
-    private static final Logger LOGGER = LogManager.getLogger(NioEmitterWorker.class);
-
-    private int index;
-
-    private final Selector selector;
-
-    private volatile boolean working = false;
-
-    private long selectTimeout = 100;
-
-    private Queue<Task> tasks = new ConcurrentLinkedQueue<>();
-
-    private SessionMapper sessionMapper;
-
-    private volatile boolean collectMetrics = false;
-
-    private long connectionTimeout = 5_000;
-
-    private EmitterMetric metric;
-
-    NioEmitterWorker(int index) throws IOException {
-        super("NioEmitterWorker-" + index);
-        selector = Selector.open();
-    }
-
-    int getIndex() {
-        return index;
-    }
-
-    public long getSelectTimeout() {
-        return selectTimeout;
-    }
-
-    public void setSelectTimeout(long selectTimeout) {
-        Assert.greaterThanZero(selectTimeout, "selectTimeout");
-        this.selectTimeout = selectTimeout;
-    }
-
-    public long getConnectionTimeout() {
-        return connectionTimeout;
-    }
-
-    public void setConnectionTimeout(long connectionTimeout) {
-        this.connectionTimeout = connectionTimeout;
-    }
-
-    public SessionMapper getSessionMapper() {
-        return sessionMapper;
-    }
-
-    public void setSessionMapper(SessionMapper sessionMapper) {
-        this.sessionMapper = sessionMapper;
-    }
-
-    public void setWorking(boolean working) {
-        this.working = working;
-        if (this.working && !working) {
-            try {
-                selector.close();
-            } catch (IOException ex) {
-                LOGGER.warn(ex.getMessage(), ex);
-            }
-        }
-    }
-
-    @Override
-    public boolean isCollectMetrics() {
-        return collectMetrics;
-    }
-
-    @Override
-    public void setCollectMetrics(boolean collectMetrics) {
-        if (collectMetrics && metric == null) {
-            metric = new EmitterMetric();
-            try {
-                metric.activate();
-            } catch (Exception e) {
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("Error occured during metric activation. " + e.getMessage(), e);
-                }
-            }
-            this.collectMetrics = true;
-        } else if (!collectMetrics && metric != null) {
-            this.collectMetrics = false;
-            try {
-                metric.deactivate();
-            } catch (Exception e) {
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("Error occured during metric deactivation. " + e.getMessage(), e);
-                }
-            }
-
-            metric = null;
-        }
-    }
-
-    public boolean isWorking() {
-        return working;
-    }
-
-    @Override
-    public void writeMetrics(MetricsContainer container) {
-        if (collectMetrics) {
-            container.update(System.currentTimeMillis(), metric);
-            metric.reset();
-        }
-    }
-
-    public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException {
-        tasks.add(new ConnectTask(sessionInfo, handler));
-        selector.wakeup();
-    }
-
-    void flush(SelectionKey key) {
-        tasks.add(new FlushTask(key));
-        selector.wakeup();
-    }
-
-    private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) {
-        try {
-            ConnectionParams connParams = sessionMapper.map(sessionInfo);
-            if (connParams == null) {
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("Unable to map session '" + sessionInfo + "'.");
-                }
-
-                if (collectMetrics) {
-                    synchronized (metric) {
-                        metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID);
-                    }
-                }
-
-                try {
-                    handler.sessionInvalidated(sessionInfo);
-                } catch (Exception e) {
-                    LOGGER.debug(e.getMessage(), e);
-                }
-
-                return;
-            }
-
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Registering session '" + sessionInfo + "'. Mapped connection parameters '" + connParams + "'.");
-            }
-
-            SocketChannel channel = SocketChannel.open();
-            channel.configureBlocking(false);
-
-            SocketAddress bindAddress = connParams.getBindAddress();
-            if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
-                channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress));
-            }
-
-            SocketAddress remoteAddress = connParams.getRemoteAddress();
-            if (remoteAddress == null) {
-                remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
-            }
-
-            NioChannelContext channelContext = new NioChannelContext(this, channel, remoteAddress, sessionInfo);
-            KeyContext keyContext = new KeyContext(channelContext, handler);
-            SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext);
-            try {
-                handler.channelRegistered(channelContext);
-            } catch (Exception ex) {
-                doCatchException(key, ex);
-            }
-
-            channelContext.selectionKey(key);
-            try {
-                channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress));
-            } catch (Exception ex) {
-                doCatchException(key, ex);
-                return;
-            }
-
-            selector.wakeup();
-        } catch (Exception e) {
-            if (collectMetrics) {
-                metric.errorCaught(e);
-            }
-            LOGGER.error(e.getMessage(), e);
-        }
-
-    }
-
-    private void doFinishConnect(SelectionKey key) {
-        SocketChannel channel = (SocketChannel) key.channel();
-        KeyContext keyContext = (KeyContext) key.attachment();
-
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Connecting to '" + keyContext.channelContext.getRemoteAddress() + "'.");
-        }
-
-        try {
-            long connStart = System.currentTimeMillis();
-            boolean timeouted = false;
-            while (!channel.finishConnect()) {
-                long now = System.currentTimeMillis();
-                if (now - connStart < connectionTimeout) {
-                    timeouted = true;
-                    break;
-                }
-            }
-            key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
-
-            if (timeouted) {
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress());
-                }
-
-                throw new ConnectException("Connection timed out.");
-
-            }
-        } catch (Exception e) {
-            doCatchException(key, e);
-            key.cancel();
-            if (collectMetrics) {
-                metric.incConnectionsErrors();
-            }
-
-            return;
-        }
-
-        try {
-            if (collectMetrics) {
-                metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress());
-                metric.addBindSocket(keyContext.channelContext.getLocalAddress());
-            }
-
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress());
-            }
-
-            keyContext.handler.channelActive(keyContext.channelContext);
-            setOpRead(key);
-        } catch (Exception ex) {
-            LOGGER.error(ex.getMessage(), ex);
-        }
-    }
-
-    private void doWrite(SelectionKey key) {
-        SocketChannel socketChannel = (SocketChannel) key.channel();
-        KeyContext keyContext = (KeyContext) key.attachment();
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Writing ({} -> {}).",
-                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
-        }
-
-        keyContext.handler.dataWriteStart(keyContext.channelContext);
-        Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue();
-        int written = 0;
-        try {
-            ByteBuffer buffer;
-            while (!queue.isEmpty()) {
-                buffer = queue.poll();
-                while (buffer.hasRemaining()) {
-                    int res = socketChannel.write(buffer);
-
-                    if (res == -1) {
-                        doClose(key);
-                        return;
-                    }
-
-                    if (collectMetrics) {
-                        metric.updateSentBytes(res);
-                    }
-
-                    written += res;
-                }
-            }
-        } catch (Exception e) {
-            doCatchException(key, e);
-            doClose(key);
-            return;
-        }
-
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Written {}B ({} -> {})", written,
-                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
-        }
-
-        //TODO Operacje na handlerach powinny przechodzic przez Executor
-        try {
-            keyContext.handler.dataWritten(keyContext.channelContext);
-            LOGGER.debug("Write handled.");
-        } catch (Exception e) {
-            LOGGER.debug(e.getMessage(), e);
-        }
-
-        setOpRead(key);
-        clearOpWrite(key);
-    }
-
-    private void doRead(SelectionKey key) {
-        SocketChannel channel = (SocketChannel) key.channel();
-        KeyContext keyContext = (KeyContext) key.attachment();
-
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Reading ({} -> {})",
-                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
-        }
-
-        ByteBuffer buffer = keyContext.buffer;
-
-        buffer.clear();
-
-        ByteBuff buff = new HeapByteBuff();
-        int totalReaded = 0;
-        int readed;
-        try {
-            while ((readed = channel.read(buffer)) > 0) {
-                buffer.flip();
-                buff.append(buffer.array(), buffer.position(), buffer.limit());
-                buffer.clear();
-                totalReaded += readed;
-
-                if (collectMetrics) {
-                    metric.updateReceivedBytes(readed);
-                }
-            }
-        } catch (IOException e) {
-            doCatchException(key, e);
-            doClose(key);
-            return;
-        }
-
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Readed {}B ({} -> {})", totalReaded,
-                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
-        }
-
-        if (totalReaded > 0) {
-            try {
-                keyContext.handler.dataReceived(keyContext.channelContext, buff);
-                LOGGER.debug("Read handled.");
-            } catch (Exception e) {
-                LOGGER.debug(e.getMessage(), e);
-            }
-        }
-
-        if (readed == -1) {
-            doClose(key);
-            return;
-        }
-
-        keyContext.buffer.flip();
-    }
-
-    private void doCatchException(SelectionKey key, Throwable cause) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Error occured. " + cause.getMessage(), cause);
-        }
-
-        if (collectMetrics) {
-            synchronized (metric) {
-                metric.errorCaught(cause);
-            }
-        }
-
-        KeyContext keyContext = (KeyContext) key.attachment();
-
-        try {
-            keyContext.handler.errorOccured(keyContext.channelContext, cause);
-        } catch (Exception e) {
-            LOGGER.debug(e.getMessage(), e);
-        }
-    }
-
-    private void doClose(SelectionKey key) {
-        if (!key.channel().isOpen()) {
-            selector.wakeup();
-            return;
-        }
-
-        try {
-            key.channel().close();
-        } catch (Exception e) {
-            LOGGER.debug(e.getMessage(), e);
-        }
-
-        KeyContext keyContext = (KeyContext) key.attachment();
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'.");
-        }
-
-        try {
-            keyContext.handler.channelInactive(keyContext.channelContext);
-        } catch (Exception e) {
-            LOGGER.debug(e.getMessage(), e);
-        }
-
-        key.cancel();
-        try {
-            keyContext.handler.channelUnregistered(keyContext.channelContext);
-        } catch (Exception e) {
-            LOGGER.debug(e.getMessage(), e);
-        }
-
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'.");
-        }
-
-        if (collectMetrics) {
-            metric.incClosedConnections();
-        }
-
-        selector.wakeup();
-    }
-
-    static void setOpRead(SelectionKey key) {
-        if (!key.isValid() || key.isReadable()) {
-            return;
-        }
-
-        key.interestOps(key.interestOps() | SelectionKey.OP_READ);
-        key.selector().wakeup();
-    }
-
-    static void clearOpRead(SelectionKey key) {
-        if (!key.isValid() || !key.isReadable()) {
-            return;
-        }
-
-        key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
-        key.selector().wakeup();
-    }
-
-    static void setOpWrite(SelectionKey key) {
-        if (!key.isValid() || key.isWritable()) {
-            return;
-        }
-
-        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-        key.selector().wakeup();
-    }
-
-    static void clearOpWrite(SelectionKey key) {
-        if (!key.isValid() || !key.isWritable()) {
-            return;
-        }
-
-        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-        key.selector().wakeup();
-    }
-
-    void requestClose(SelectionKey key) {
-        tasks.add(new CloseTask(key));
-        key.selector().wakeup();
-    }
-
-    @Override
-    public void run() {
-        int selected = 0;
-        working = true;
-        while (working) {
-            if (!tasks.isEmpty()) {
-                Task task;
-                while ((task = tasks.poll()) != null) {
-                    if (task.code == Task.CLOSE) {
-                        doClose(((CloseTask) task).key);
-                    } else if (task.code == Task.CONNECT) {
-                        ConnectTask taskConn = (ConnectTask) task;
-                        doConnect(taskConn.sessionInfo, taskConn.handler);
-                    } else if (task.code == Task.FLUSH) {
-                        FlushTask flushTask = (FlushTask) task;
-                        setOpWrite(flushTask.key);
-                    }
-                }
-            }
-
-            try {
-                selected = selector.select(selectTimeout);
-            } catch (IOException ex) {
-                LOGGER.warn(ex.getMessage(), ex);
-            }
-
-            if (selected > 0) {
-                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
-                while (it.hasNext()) {
-                    SelectionKey key = it.next();
-                    it.remove();
-
-                    if (!key.isValid()) {
-                        continue;
-                    }
-
-                    if (key.isConnectable()) {
-                        doFinishConnect(key);
-                    } else if (key.isWritable()) {
-                        doWrite(key);
-                    } else if (key.isReadable()) {
-                        doRead(key);
-                    }
-                }
-            }
-        }
-    }
-
-    private static class KeyContext {
-
-        private final EmitterHandler handler;
-
-        private final NioChannelContext channelContext;
-
-        private ByteBuffer buffer = ByteBuffer.allocate(1024);
-
-        public KeyContext(NioChannelContext channelContext, EmitterHandler handler) {
-            this.channelContext = channelContext;
-            this.handler = handler;
-        }
-
-    }
-
-    private static abstract class Task {
-
-        public static final int CLOSE = 1;
-        public static final int CONNECT = 2;
-        public static final int FLUSH = 3;
-
-        private final int code;
-
-        public Task(int code) {
-            this.code = code;
-        }
-
-    }
-
-    private final static class ConnectTask extends Task {
-
-        private final SessionInfo sessionInfo;
-        private final EmitterHandler handler;
-
-        public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) {
-            super(CONNECT);
-            this.sessionInfo = sessionInfo;
-            this.handler = handler;
-        }
-
-    }
-
-    private final static class CloseTask extends Task {
-
-        private final SelectionKey key;
-
-        public CloseTask(SelectionKey key) {
-            super(CLOSE);
-            this.key = key;
-        }
-
-    }
-
-    private final static class FlushTask extends Task {
-
-        private final SelectionKey key;
-
-        public FlushTask(SelectionKey key) {
-            super(FLUSH);
-            this.key = key;
-        }
-
-    }
-}
+package com.passus.st.emitter.nio;
+
+import com.passus.commons.Assert;
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.EmitterMetric;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.emitter.SessionMapper;
+import com.passus.st.metric.MetricSource;
+import java.io.IOException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public abstract class NioEmitterWorker extends Thread implements MetricSource {
+
+    protected final Logger logger = LogManager.getLogger(getClass());
+
+    private final int index;
+
+    protected long selectTimeout = 100;
+
+    protected SessionMapper sessionMapper;
+
+    protected volatile boolean collectMetrics = false;
+
+    protected long connectionTimeout = 5_000;
+
+    protected EmitterMetric metric;
+
+    protected NioEmitterWorker(int index) throws IOException {
+        super("NioEmitterWorker-" + index);
+        this.index = index;
+    }
+
+    public int getIndex() {
+        return index;
+    }
+
+    public abstract void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException;
+
+    public abstract void setWorking(boolean b);
+
+    public long getSelectTimeout() {
+        return selectTimeout;
+    }
+
+    public void setSelectTimeout(long selectTimeout) {
+        Assert.greaterThanZero(selectTimeout, "selectTimeout");
+        this.selectTimeout = selectTimeout;
+    }
+
+    public long getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    public void setConnectionTimeout(long connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+    }
+
+    public SessionMapper getSessionMapper() {
+        return sessionMapper;
+    }
+
+    public void setSessionMapper(SessionMapper sessionMapper) {
+        this.sessionMapper = sessionMapper;
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        if (collectMetrics && metric == null) {
+            metric = new EmitterMetric();
+            try {
+                metric.activate();
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Error occured during metric activation. " + e.getMessage(), e);
+                }
+            }
+            this.collectMetrics = true;
+        } else if (!collectMetrics && metric != null) {
+            this.collectMetrics = false;
+            try {
+                metric.deactivate();
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Error occured during metric deactivation. " + e.getMessage(), e);
+                }
+            }
+
+            metric = null;
+        }
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker2.java	Tue Oct 24 15:26:49 2017 +0200
@@ -0,0 +1,787 @@
+package com.passus.st.emitter.nio;
+
+import com.passus.data.ByteBuff;
+import com.passus.data.HeapByteBuff;
+import com.passus.net.SocketAddress;
+import com.passus.net.utils.AddressUtils;
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.EmitterMetric;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.metric.MetricsContainer;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class NioEmitterWorker2 extends NioEmitterWorker {
+
+    private final Selector selector;
+
+    private volatile boolean working = false;
+
+    private final Queue<Task> tasks = new ConcurrentLinkedQueue<>();
+
+    private boolean reusePort = true;
+
+    private boolean tcpNoDelay = true;
+
+    private final AtomicBoolean wakeUp = new AtomicBoolean(false);
+
+    private final Object selectorLock1 = new Object();
+
+    private final Object selectorLock2 = new Object();
+
+    private long wakeUpTime;
+
+    public NioEmitterWorker2(int index) throws IOException {
+        super(index);
+        selector = Selector.open();
+    }
+
+    public boolean isReusePort() {
+        return reusePort;
+    }
+
+    public void setReusePort(boolean reusePort) {
+        this.reusePort = reusePort;
+    }
+
+    public boolean isTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    public void setTcpNoDelay(boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        if (collectMetrics && metric == null) {
+            metric = new EmitterMetric();
+            try {
+                metric.activate();
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Error occured during metric activation. " + e.getMessage(), e);
+                }
+            }
+            this.collectMetrics = true;
+        } else if (!collectMetrics && metric != null) {
+            this.collectMetrics = false;
+            try {
+                metric.deactivate();
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Error occured during metric deactivation. " + e.getMessage(), e);
+                }
+            }
+
+            metric = null;
+        }
+    }
+
+    public boolean isWorking() {
+        return working;
+    }
+
+    @Override
+    public void setWorking(boolean working) {
+        if (this.working && !working) {
+            this.working = false;
+            tasks.clear();
+            wakeUp();
+
+            try {
+                selector.close();
+            } catch (IOException ex) {
+                logger.warn(ex.getMessage(), ex);
+            }
+
+            interrupt();
+
+            try {
+                join();
+            } catch (Exception e) {
+            }
+
+        }
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+        if (collectMetrics) {
+            container.update(System.currentTimeMillis(), metric);
+            metric.reset();
+        }
+    }
+
+    @Override
+    public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException {
+        doRegister(sessionInfo, handler);
+    }
+
+    void flush(SelectionKey key) {
+        tasks.add(new FlushTask(key));
+        selector.wakeup();
+    }
+
+    private void doRegister(SessionInfo sessionInfo, EmitterHandler handler) {
+        long t0 = System.currentTimeMillis();
+        try {
+            /*ConnectionParams connParams = sessionMapper.map(sessionInfo);
+            if (connParams == null) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Unable to map session '" + sessionInfo + "'.");
+                }
+
+                try {
+                    handler.sessionInvalidated(sessionInfo);
+                } catch (Exception e) {
+                    logger.debug(e.getMessage(), e);
+                }
+
+                return;
+            }*/
+
+            if (logger.isDebugEnabled()) {
+                //logger.debug("Registering session '" + sessionInfo + "'. Mapped connection parameters '" + connParams + "'.");
+                logger.debug("Registering session '" + sessionInfo + "'.");
+            }
+
+            SocketChannel channel = SocketChannel.open();
+            channel.setOption(StandardSocketOptions.SO_REUSEADDR, reusePort);
+            channel.setOption(StandardSocketOptions.SO_LINGER, 0);
+            channel.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay);
+
+            channel.configureBlocking(false);
+
+            //SocketAddress bindAddress = connParams.getBindAddress();
+            /*if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
+                channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress));
+            }*/
+
+ /*SocketAddress remoteAddress = connParams.getRemoteAddress();
+            if (remoteAddress == null) {
+                remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
+            }*/
+            SocketAddress remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
+
+            NioChannelContext2 channelContext = new NioChannelContext2(this, channel, remoteAddress, sessionInfo);
+            KeyContext keyContext = new KeyContext(channelContext, handler);
+
+            SelectionKey key;
+            synchronized (selectorLock2) {
+                wakeUp();
+
+                synchronized (selectorLock1) {
+                    key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext);
+                    channelContext.regTime = System.currentTimeMillis();
+                }
+
+                channelContext.selectionKey(key);
+
+                try {
+                    handler.channelRegistered(channelContext);
+                } catch (Exception ex) {
+                    doCatchException(key, ex);
+                }
+
+                try {
+                    channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress));
+                } catch (Exception ex) {
+                    doCatchException(key, ex);
+                    doClose(key);
+                    return;
+                }
+
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Session registered '{}'.", sessionInfo);
+                }
+            }
+
+            wakeUp();
+        } catch (Exception e) {
+            if (collectMetrics) {
+                metric.errorCaught(e);
+            }
+            logger.error(e.getMessage(), e);
+        }
+
+        /*long t1 = System.currentTimeMillis();
+        if (t1 - t0 > 2) {
+            System.out.println("Register: " + (t1 - t0));
+        }*/
+    }
+
+    private void doFinishConnect(SelectionKey key) {
+        SocketChannel channel = (SocketChannel) key.channel();
+        KeyContext keyContext = (KeyContext) key.attachment();
+
+        long t0 = System.currentTimeMillis();
+        if (logger.isDebugEnabled()) {
+            logger.debug("Connecting to '{}'.", keyContext.channelContext.getRemoteAddress());
+        }
+
+        try {
+            long connStart = System.currentTimeMillis();
+            boolean timeouted = false;
+            while (!channel.finishConnect()) {
+                long now = System.currentTimeMillis();
+                if (now - connStart < connectionTimeout) {
+                    timeouted = true;
+                    break;
+                }
+            }
+
+            long t1 = System.currentTimeMillis();
+            if (t1 - keyContext.channelContext.regTime > 10) {
+                System.out.println("--Connect delay: " + (t1 - keyContext.channelContext.regTime));
+            }
+
+            clearOpConnect(key);
+            setOpRead(key);
+            if (timeouted) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Connection '{}' timed out.", keyContext.channelContext.getRemoteAddress());
+                }
+
+                throw new ConnectException("Connection timed out.");
+
+            }
+        } catch (Exception e) {
+            doCatchException(key, e);
+            doClose(key);
+            if (collectMetrics) {
+                metric.incConnectionsErrors();
+            }
+
+            return;
+        }
+
+        try {
+            if (collectMetrics) {
+                metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress());
+                metric.addBindSocket(keyContext.channelContext.getLocalAddress());
+            }
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress());
+            }
+
+            keyContext.handler.channelActive(keyContext.channelContext);
+        } catch (Exception ex) {
+            logger.error(ex.getMessage(), ex);
+        }
+
+        /*long t1 = System.currentTimeMillis();
+        if (t1 - t0 > 2) {
+            System.out.println("Finish connect: " + (t1 - t0));
+        }*/
+    }
+
+    private void doWrite(SelectionKey key) {
+        SocketChannel socketChannel = (SocketChannel) key.channel();
+        KeyContext keyContext = (KeyContext) key.attachment();
+        if (logger.isDebugEnabled()) {
+            logger.debug("Writing to '{}'.", keyContext.channelContext.getRemoteAddress());
+        }
+
+        Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue();
+        int written = 0;
+        try {
+            ByteBuffer buffer;
+            while (!queue.isEmpty()) {
+                buffer = queue.poll();
+                while (buffer.hasRemaining()) {
+                    int res = socketChannel.write(buffer);
+
+                    if (res == -1) {
+                        doClose(key);
+                        return;
+                    }
+
+                    written += res;
+                }
+            }
+        } catch (Exception e) {
+            doCatchException(key, e);
+            doClose(key);
+            return;
+        }
+
+        if (collectMetrics) {
+            metric.updateSentBytes(written);
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Written '" + written + "' to '" + keyContext.channelContext.getRemoteAddress() + "'.");
+        }
+
+        //TODO Operacje na handlerach powinny przechodzic przez Executor
+        try {
+            keyContext.handler.dataWritten(keyContext.channelContext);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+
+        setOpRead(key);
+        clearOpWrite(key);
+    }
+
+    private int read(SocketChannel channel, ByteBuffer buffer, ByteBuff out) throws IOException {
+        int readed;
+        int totalReaded = 0;
+        while ((readed = channel.read(buffer)) > 0) {
+            buffer.flip();
+            out.append(buffer.array(), buffer.position(), buffer.limit());
+            buffer.clear();
+            if (readed < 0) {
+                return -1;
+            }
+
+            totalReaded += readed;
+        }
+
+        return totalReaded;
+    }
+
+    private void doRead(SelectionKey key) {
+        SocketChannel channel = (SocketChannel) key.channel();
+        KeyContext keyContext = (KeyContext) key.attachment();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Reading from '" + keyContext.channelContext.getRemoteAddress() + "'.");
+        }
+
+        ByteBuffer buffer = keyContext.buffer;
+
+        keyContext.buffer.clear();
+
+        ByteBuff buff = new HeapByteBuff();
+        int totalReaded = 0;
+        int readed;
+        try {
+            while ((readed = channel.read(buffer)) > 0) {
+                buffer.flip();
+                buff.append(buffer.array(), buffer.position(), buffer.limit());
+                buffer.clear();
+                totalReaded += readed;
+            }
+        } catch (IOException e) {
+            doCatchException(key, e);
+            doClose(key);
+            return;
+        }
+
+        if (collectMetrics) {
+            metric.updateReceivedBytes(totalReaded);
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug(totalReaded + " readed from '" + keyContext.channelContext.getRemoteAddress() + "'.");
+        }
+
+        try {
+            keyContext.handler.dataReceived(keyContext.channelContext, buff);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+
+        if (readed == -1) {
+            doClose(key);
+            return;
+        }
+
+        keyContext.buffer.flip();
+    }
+
+    private void doCatchException(SelectionKey key, Throwable cause) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Error occured. " + cause.getMessage(), cause);
+        }
+
+        if (collectMetrics) {
+            synchronized (metric) {
+                metric.errorCaught(cause);
+            }
+        }
+
+        KeyContext keyContext = (KeyContext) key.attachment();
+
+        try {
+            keyContext.handler.errorOccured(keyContext.channelContext, cause);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+    }
+
+    void doClose(SelectionKey key) {
+        if (!key.isValid()) {
+            return;
+        }
+
+        KeyContext keyContext = (KeyContext) key.attachment();
+        if (key.channel().isOpen()) {
+            try {
+                key.channel().close();
+            } catch (Exception e) {
+                logger.debug(e.getMessage(), e);
+            }
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'.");
+        }
+        try {
+            keyContext.handler.channelInactive(keyContext.channelContext);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'.");
+        }
+
+        if (collectMetrics) {
+            metric.incClosedConnections();
+        }
+    }
+
+    private void sslDoHandshake(SocketChannel socketChannel, SSLEngine engine,
+            ByteBuffer myNetData, ByteBuffer peerNetData) throws Exception {
+
+        // Create byte buffers to use for holding application data
+        int appBufferSize = engine.getSession().getApplicationBufferSize();
+        ByteBuffer myAppData = ByteBuffer.allocate(appBufferSize);
+        ByteBuffer peerAppData = ByteBuffer.allocate(appBufferSize);
+
+        // Begin handshake
+        engine.beginHandshake();
+        SSLEngineResult.HandshakeStatus hs = engine.getHandshakeStatus();
+
+        // Process handshaking message
+        while (hs != SSLEngineResult.HandshakeStatus.FINISHED
+                && hs != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
+            switch (hs) {
+                case NEED_UNWRAP:
+                    // Receive handshaking data from peer
+                    if (socketChannel.read(peerNetData) < 0) {
+                        // Handle closed channel
+                    }
+
+                    // Process incoming handshaking data
+                    peerNetData.flip();
+                    SSLEngineResult res = engine.unwrap(peerNetData, peerAppData);
+                    peerNetData.compact();
+                    hs = res.getHandshakeStatus();
+
+                    switch (res.getStatus()) {
+                        case OK:
+                            // Handle OK status
+                            break;
+                        case BUFFER_UNDERFLOW:
+                            break;
+                        case BUFFER_OVERFLOW:
+                            break;
+                        case CLOSED:
+                            break;
+                        // Handle other status: BUFFER_UNDERFLOW, BUFFER_OVERFLOW, CLOSED
+                    }
+                    break;
+
+                case NEED_WRAP:
+                    // Empty the local network packet buffer.
+                    myNetData.clear();
+
+                    // Generate handshaking data
+                    res = engine.wrap(myAppData, myNetData);
+                    hs = res.getHandshakeStatus();
+
+                    // Check status
+                    switch (res.getStatus()) {
+                        case OK:
+                            myNetData.flip();
+
+                            // Send the handshaking data to peer
+                            while (myNetData.hasRemaining()) {
+                                if (socketChannel.write(myNetData) < 0) {
+                                    // Handle closed channel
+                                }
+                            }
+                            break;
+
+                        // Handle other status:  BUFFER_OVERFLOW, BUFFER_UNDERFLOW, CLOSED
+                    }
+                    break;
+
+                case NEED_TASK:
+                    // Handle blocking tasks
+                    break;
+
+                // Handle other status:  // FINISHED or NOT_HANDSHAKING
+            }
+        }
+        // Processes after handshaking
+    }
+
+    private void sslDoWrap(SocketChannel socketChannel) {
+
+    }
+
+    private void sslDoUnwrap(SocketChannel socketChannel, SSLEngine engine, ByteBuffer netData, ByteBuffer appData) {
+        SSLEngineResult res;
+        try {
+            res = engine.unwrap(netData, appData);
+        } catch (Exception e) {
+            return;
+        }
+
+        switch (res.getStatus()) {
+            case OK:
+                break;
+            case CLOSED:
+                break;
+            case BUFFER_OVERFLOW:
+                // Maybe need to enlarge the peer application data buffer.
+                if (engine.getSession().getApplicationBufferSize()
+                        > appData.capacity()) {
+                    // enlarge the peer application data buffer
+                } else {
+                    // compact or clear the buffer
+                }
+                // retry the operation
+                break;
+
+            case BUFFER_UNDERFLOW:
+                // Maybe need to enlarge the peer network packet buffer
+                if (engine.getSession().getPacketBufferSize()
+                        > netData.capacity()) {
+                    // enlarge the peer network packet buffer
+                } else {
+                    // compact or clear the buffer
+                }
+                // obtain more inbound network data and then retry the operation
+                break;
+        }
+    }
+
+    private void wakeUp() {
+        if (wakeUp.compareAndSet(false, true)) {
+            wakeUpTime = System.currentTimeMillis();
+            selector.wakeup();
+        }
+    }
+
+    private void clearOpConnect(SelectionKey key) {
+        if (!key.isValid() || !key.isConnectable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
+    }
+
+    private void setOpRead(SelectionKey key) {
+        if (!key.isValid() || key.isReadable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+        wakeUp();
+    }
+
+    private void clearOpRead(SelectionKey key) {
+        if (!key.isValid() || !key.isReadable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
+    }
+
+    private void setOpWrite(SelectionKey key) {
+        if (!key.isValid() || key.isWritable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+        wakeUp();
+    }
+
+    private void clearOpWrite(SelectionKey key) {
+        if (!key.isValid() || !key.isWritable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+    }
+
+    void requestClose(SelectionKey key) {
+        tasks.add(new CloseTask(key));
+        wakeUp();
+    }
+
+    @Override
+    public void run() {
+        int selected = 0;
+        working = true;
+        while (working) {
+            long t0 = System.currentTimeMillis();
+            if (!tasks.isEmpty()) {
+                Task task;
+                while ((task = tasks.poll()) != null) {
+                    if (task.code == Task.CLOSE) {
+                        doClose(((CloseTask) task).key);
+                        /*} else if (task.code == Task.CONNECT) {
+                        ConnectTask taskConn = (ConnectTask) task;
+                        doRegister(taskConn.sessionInfo, taskConn.handler);*/
+                    } else if (task.code == Task.FLUSH) {
+                        FlushTask flushTask = (FlushTask) task;
+                        setOpWrite(flushTask.key);
+                    }
+                }
+            }
+            /*long t1 = System.currentTimeMillis();
+            if (t1 - t0 > 2) {
+                System.out.println("tasks loop: " + (t1 - t0));
+            }*/
+
+            synchronized (selectorLock1) {
+                try {
+                    //long t0 = System.currentTimeMillis();
+                    selected = selector.select(selectTimeout);
+                    wakeUp.set(false);
+                    /*if (this.wakeUpTime != 0) {
+                        long wakeUpTime = System.currentTimeMillis();
+
+                        if (wakeUpTime - this.wakeUpTime > 2) {
+                            System.out.println("wakeUpTime: " + (wakeUpTime - this.wakeUpTime));
+                        }
+                        this.wakeUpTime = 0;
+                    }*/
+
+                    //long t1 = System.currentTimeMillis();
+                    /*if (t1 - t0 > 2) {
+                        System.out.println("select: " + (t1 - t0) + " ms");
+                    }*/
+                } catch (IOException ex) {
+                    logger.warn(ex.getMessage(), ex);
+                }
+            }
+
+            t0 = System.currentTimeMillis();
+            synchronized (selectorLock2) {
+                if (selected > 0) {
+                    Iterator<SelectionKey> it = selector.selectedKeys().iterator();
+                    while (it.hasNext()) {
+                        SelectionKey key = it.next();
+                        it.remove();
+
+                        if (!key.isValid()) {
+                            try {
+                                KeyContext keyContext = (KeyContext) key.attachment();
+                                keyContext.handler.channelUnregistered(keyContext.channelContext);
+                            } catch (Exception e) {
+                                logger.debug(e.getMessage(), e);
+                            }
+                            continue;
+                        }
+
+                        if (key.isConnectable()) {
+                            doFinishConnect(key);
+                        } else if (key.isWritable()) {
+                            doWrite(key);
+                        } else if (key.isReadable()) {
+                            doRead(key);
+                        }
+                    }
+                }
+            }
+
+            /*t1 = System.currentTimeMillis();
+            if (t1 - t0 > 2) {
+                System.out.println("selected loop: " + (t1 - t0));
+            }*/
+        }
+    }
+
+    private static class KeyContext {
+
+        private final EmitterHandler handler;
+
+        private final NioChannelContext2 channelContext;
+
+        private ByteBuffer buffer = ByteBuffer.allocate(1024);
+
+        public KeyContext(NioChannelContext2 channelContext, EmitterHandler handler) {
+            this.channelContext = channelContext;
+            this.handler = handler;
+        }
+
+    }
+
+    private static abstract class Task {
+
+        public static final int CLOSE = 1;
+        public static final int CONNECT = 2;
+        public static final int FLUSH = 3;
+
+        private final int code;
+
+        public Task(int code) {
+            this.code = code;
+        }
+
+    }
+
+    private final static class ConnectTask extends Task {
+
+        private final SessionInfo sessionInfo;
+        private final EmitterHandler handler;
+
+        public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) {
+            super(CONNECT);
+            this.sessionInfo = sessionInfo;
+            this.handler = handler;
+        }
+
+    }
+
+    private final static class CloseTask extends Task {
+
+        private final SelectionKey key;
+
+        public CloseTask(SelectionKey key) {
+            super(CLOSE);
+            this.key = key;
+        }
+
+    }
+
+    private final static class FlushTask extends Task {
+
+        private final SelectionKey key;
+
+        public FlushTask(SelectionKey key) {
+            super(FLUSH);
+            this.key = key;
+        }
+
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/emitter/nio/NioEmitterWorker2Test.java	Tue Oct 24 15:26:49 2017 +0200
@@ -0,0 +1,156 @@
+package com.passus.st.emitter.nio;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.*;
+import com.passus.data.ByteBuff;
+import com.passus.data.HeapByteBuff;
+import com.passus.net.http.HttpConsts;
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpRequestBuilder;
+import com.passus.net.http.HttpRequestEncoder;
+import com.passus.st.AbstractWireMockTest;
+import com.passus.st.client.TestClientHandler;
+import com.passus.st.client.TestClientHandler.ClientEvent;
+import com.passus.st.client.TestClientHandler.EventType;
+import com.passus.st.emitter.ChannelContext;
+import com.passus.st.emitter.SessionInfo;
+import java.net.ConnectException;
+import org.testng.AssertJUnit;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+import org.testng.annotations.Test;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class NioEmitterWorker2Test extends AbstractWireMockTest {
+
+    private void waitConn(TestClientHandler handler, int expectedEventsSize) {
+        waitConn(handler, expectedEventsSize, 5_000);
+    }
+
+    private void waitConn(TestClientHandler handler, int expectedEventsSize, long timeout) {
+        long endTime = System.currentTimeMillis() + timeout;
+        if (endTime <= 0) {
+            endTime = Long.MAX_VALUE;
+        }
+
+        while (true) {
+            if (handler.size() >= expectedEventsSize
+                    || System.currentTimeMillis() >= endTime
+                    || handler.isChannelUnregistered()) {
+                break;
+            }
+
+            try {
+                Thread.sleep(100);
+            } catch (Exception ignore) {
+            }
+        }
+    }
+
+    @Test(enabled = true)
+    public void testConnectAndClose() throws Exception {
+        NioEmitter emitter = new NioEmitter(NioEmitterWorker2.class);
+        emitter.setMaxThreads(1);
+        emitter.start();
+        SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, PORT);
+
+        TestClientHandler handler = new TestClientHandler() {
+            @Override
+            protected void doChannelActive(ChannelContext context) throws Exception {
+                context.close();
+            }
+        };
+
+        emitter.connect(info, handler, 0);
+        waitConn(handler, 3);
+
+        AssertJUnit.assertEquals(3, handler.size());
+        AssertJUnit.assertEquals(EventType.CHANNEL_REGISTERED, handler.get(0).getType());
+        AssertJUnit.assertEquals(EventType.CHANNEL_ACTIVE, handler.get(1).getType());
+        AssertJUnit.assertEquals(EventType.CHANNEL_INACTIVE, handler.get(2).getType());
+    }
+
+    @Test(enabled = true)
+    public void testWriteAndRead() throws Exception {
+        String path = "/some/thing";
+        String url = "http://localhost" + path;
+        HttpRequest req = HttpRequestBuilder.get(url).version(HttpConsts.VERSION_1_0).build();
+        String content = "Hello world!";
+        stubFor(get(urlEqualTo("/some/thing"))
+                .willReturn(aResponse()
+                        .withHeader("Content-Type", "text/plain")
+                        .withHeader("Content-Length", "" + content.length())
+                        .withBody(content)));
+
+        NioEmitter emitter = new NioEmitter(NioEmitterWorker2.class);
+        emitter.setMaxThreads(1);
+        emitter.start();
+        SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, PORT);
+
+        TestClientHandler handler = new TestClientHandler() {
+
+            @Override
+            protected void doChannelActive(ChannelContext context) throws Exception {
+                ByteBuff buff = new HeapByteBuff();
+                HttpRequestEncoder.getInstance().encode(req, buff);
+                context.writeAndFlush(buff);
+            }
+
+            @Override
+            protected void doDataReceived(ChannelContext context, ByteBuff data) throws Exception {
+                if (data.toString().contains("!")) {
+                    context.close();
+                }
+            }
+
+        };
+
+        emitter.connect(info, handler, 0);
+        waitConn(handler, 20, 5_000);
+
+        assertTrue(handler.size() >= 6);
+        int index = 0;
+        assertEquals(EventType.CHANNEL_REGISTERED, handler.get(index++).getType());
+        assertEquals(EventType.CHANNEL_ACTIVE, handler.get(index++).getType());
+        assertEquals(EventType.DATA_WRITTEN, handler.get(index++).getType());
+
+        ByteBuff resContentBuff = new HeapByteBuff();
+
+        do {
+            ClientEvent event = handler.get(index++);
+            assertEquals(EventType.DATA_RECEIVED, event.getType());
+            resContentBuff.append(event.getData());
+        } while (handler.get(index).getType() == EventType.DATA_RECEIVED);
+
+        assertEquals(EventType.CHANNEL_INACTIVE, handler.get(index++).getType());
+        /*assertEquals(EventType.CHANNEL_UNREGISTERED, handler.get(index++).getType());*/
+        String responseContent = resContentBuff.toString();
+        assertTrue(responseContent.endsWith(content));
+    }
+
+    @Test(enabled = true)
+    public void testError_ConnectionRefused() throws Exception {
+        NioEmitter emitter = new NioEmitter(NioEmitterWorker2.class);
+        emitter.setMaxThreads(1);
+        emitter.start();
+        SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, 10_000);
+
+        TestClientHandler handler = new TestClientHandler() {
+            @Override
+            protected void doChannelActive(ChannelContext context) throws Exception {
+                context.close();
+            }
+        };
+
+        emitter.connect(info, handler, 0);
+        waitConn(handler, 2);
+
+        AssertJUnit.assertEquals(EventType.CHANNEL_REGISTERED, handler.get(0).getType());
+        ClientEvent clientEvent = handler.get(1);
+        AssertJUnit.assertEquals(EventType.ERROR_OCCURED, clientEvent.getType());
+        AssertJUnit.assertTrue(clientEvent.getCause() instanceof ConnectException);
+        AssertJUnit.assertTrue(clientEvent.getCause().getMessage().startsWith("Connection refused"));
+    }
+}