changeset 620:811aa52e7ebe http-asynch-worker

in progress
author Devel 2
date Fri, 13 Oct 2017 07:34:51 +0200
parents c4e1b90cb412
children
files stress-tester/src/main/java/com/passus/st/Main.java stress-tester/src/main/java/com/passus/st/Test.java stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContextNoExecutors.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerNoExecutors.java
diffstat 10 files changed, 1937 insertions(+), 301 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/Main.java	Mon Oct 09 10:02:28 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/Main.java	Fri Oct 13 07:34:51 2017 +0200
@@ -242,11 +242,13 @@
             client.setCollectMetrics(true);
             client.setConnectPartialSession(cl.hasOption("ps"));
             client.setWokerType(cl.getOptionValue("wt", "synch"));
-            client.addListener((request, response, context) -> {
+            /*client.addListener((request, response, context) -> {
                 if (startTime == 0) {
                     startTime = System.currentTimeMillis();
                 }
-            });
+            });*/
+            
+            startTime = System.currentTimeMillis();
             if (cl.hasOption("pr")) {
                 if (clArgs.length != 1) {
                     throw new IllegalArgumentException("Parameter \"parallelReplays\" works only for one pcap file.");
@@ -263,6 +265,7 @@
                 client.setWorkersNum(clArgs.length);
                 client.setDispatcher(new HttpSourceNameAwareClientWorkerDispatcher());
             }
+            emitter.setMaxThreads(1);
             emitter.start();
 
             if (cl.hasOption("rs")) {
@@ -438,26 +441,28 @@
 
             collector.collect();
 
-            executor.shutdownNow();
+            
             for (int i = 0; i < clArgs.length; i++) {
                 eventSrcs[i].stop();
             }
 
             client.stop();
             emitter.stop();
+            
+            collector.flush(true);
+            executor.shutdownNow();
+            printMetrics();
+            
             if (reporterClient != null) {
                 reporterClient.send(new MapMetric("endOfReplay", Collections.emptyMap()));
                 reporterClient.waitForEmptyQueue();
                 reporterClient.stop();
                 System.out.println("Dropped reporter messages: " + reporterClient.getDroppedMessages());
             }
+            
             if (summaryListener != null) {
                 summaryListener.close();
             }
-
-            collector.flush(true);
-
-            printMetrics();
         } catch (ParseException e) {
             System.out.println(e.getMessage());
             printHelp(options);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/Test.java	Fri Oct 13 07:34:51 2017 +0200
@@ -0,0 +1,630 @@
+package com.passus.st;
+
+import com.passus.net.SocketAddress;
+import com.passus.st.emitter.ChannelContext;
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.PassThroughSessionMapper;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.emitter.nio.NioEmitterWorkerNoExecutors;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.StandardSocketOptions;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class Test {
+
+    private int threadsNum = 4;
+
+    private int loops = 10;
+
+    private int connsPerLoop = 40;
+
+    private boolean reuseAddress = true;
+
+    private String host = "127.0.0.1";
+
+    private int port = 80;
+
+    public static void main(String[] args) throws Exception {
+        //Log4jConfigurationFactory.enableFactory("debug");
+        Test test = new Test();
+        String testName = "all";
+        if (args.length == 1) {
+            testName = args[0];
+        }
+
+        test.run(testName);
+    }
+
+    public void run(String testName) throws Exception {
+        System.out.println("threads: " + threadsNum + " loops: " + loops + " connsPerLoop: " + connsPerLoop + " reuseAddress: " + reuseAddress);
+        if (testName.equals("all") || testName.equalsIgnoreCase("nio")) {
+            runNioTest();
+        }
+
+        if (testName.equals("all") || testName.equalsIgnoreCase("nio2")) {
+            runNio2Test();
+        }
+
+        if (testName.equals("all") || testName.equalsIgnoreCase("socket")) {
+            runSocketTest();
+        }
+
+        if (testName.equals("all") || testName.equalsIgnoreCase("threadPerSocket")) {
+            runThreadPerSocketTest();
+        }
+
+        if (testName.equals("all") || testName.equalsIgnoreCase("nioEmitter")) {
+            runNio2Test();
+            runNioEmitterWorker();
+        }
+
+        //runNioEmitterWorker();
+    }
+
+    public void runSocketTest() throws Exception {
+        System.out.println("Socket");
+
+        long startTime = System.currentTimeMillis();
+
+        SocketBunchThread[] threads = new SocketBunchThread[threadsNum];
+        for (int i = 0; i < threadsNum; i++) {
+            threads[i] = new SocketBunchThread();
+            threads[i].loops = loops;
+            threads[i].connNum = connsPerLoop;
+            threads[i].reuseAddress = reuseAddress;
+            threads[i].remote = new InetSocketAddress(host, port);
+            threads[i].start();
+        }
+
+        int totalConn = 0;
+        for (int i = 0; i < threadsNum; i++) {
+            threads[i].join();
+            totalConn += threads[i].totalConn;
+        }
+
+        System.out.println("time: " + (System.currentTimeMillis() - startTime) + " conns: " + totalConn);
+        for (int i = 0; i < threadsNum; i++) {
+            System.out.println("time thread" + i + ": " + threads[i].duration);
+        }
+    }
+
+    public void runThreadPerSocketTest() throws Exception {
+        System.out.println("ThreadPerSocket");
+
+        long startTime = System.currentTimeMillis();
+
+        SocketThread[] threads = new SocketThread[connsPerLoop];
+        InetSocketAddress remote = new InetSocketAddress(host, port);
+        int totalConn = 0;
+
+        for (int i = 0; i < loops * threadsNum; i++) {
+            for (int j = 0; j < connsPerLoop; j++) {
+                threads[j] = new SocketThread(remote);
+                threads[j].start();
+                totalConn++;
+            }
+
+            for (int j = 0; j < connsPerLoop; j++) {
+                threads[j].join();
+            }
+
+            for (int j = 0; j < connsPerLoop; j++) {
+                threads[j].socket.close();
+            }
+        }
+
+        System.out.println("time: " + (System.currentTimeMillis() - startTime) + " conns: " + totalConn);
+    }
+
+    public void runNioTest() throws Exception {
+        System.out.println("Nio");
+
+        long startTime = System.currentTimeMillis();
+
+        NioThread[] threads = new NioThread[threadsNum];
+        for (int i = 0; i < threadsNum; i++) {
+            threads[i] = new NioThread();
+            threads[i].loops = loops;
+            threads[i].connNum = connsPerLoop;
+            threads[i].reuseAddress = reuseAddress;
+            threads[i].remote = new InetSocketAddress(host, port);
+            threads[i].start();
+        }
+
+        int totalConn = 0;
+        for (int i = 0; i < threadsNum; i++) {
+            threads[i].join();
+            totalConn += threads[i].totalConn;
+        }
+
+        System.out.println("time: " + (System.currentTimeMillis() - startTime) + " conns: " + totalConn);
+        for (int i = 0; i < threadsNum; i++) {
+            System.out.println("time thread" + i + ": " + threads[i].duration);
+        }
+
+    }
+
+    public void runNio2Test() throws Exception {
+        System.out.println("Nio2");
+
+        long startTime = System.currentTimeMillis();
+
+        Nio2Thread[] threads = new Nio2Thread[threadsNum];
+        for (int i = 0; i < threadsNum; i++) {
+            threads[i] = new Nio2Thread();
+            threads[i].loops = loops;
+            threads[i].connNum = connsPerLoop;
+            threads[i].reuseAddress = reuseAddress;
+            threads[i].remote = new InetSocketAddress(host, port);
+            threads[i].start();
+        }
+
+        int totalConn = 0;
+        for (int i = 0; i < threadsNum; i++) {
+            threads[i].join();
+            totalConn += threads[i].totalConn;
+        }
+
+        System.out.println("time: " + (System.currentTimeMillis() - startTime) + " conns: " + totalConn);
+        for (int i = 0; i < threadsNum; i++) {
+            System.out.println("time thread" + i + ": " + threads[i].duration);
+        }
+
+    }
+
+    public void runNioEmitterWorker() throws Exception {
+        System.out.println("NioEmitterWorker");
+        NioEmitterWorkerNoExecutors[] workers = new NioEmitterWorkerNoExecutors[threadsNum];
+
+        for (int i = 0; i < threadsNum; i++) {
+            workers[i] = new NioEmitterWorkerNoExecutors(i);
+            /*threads[i].loops = loops;
+            threads[i].connNum = connsPerLoop;
+            threads[i].reuseAddress = reuseAddress;*/
+            workers[i].setReusePort(reuseAddress);
+            workers[i].setSessionMapper(new PassThroughSessionMapper());
+            workers[i].start();
+        }
+
+        SessionInfo[] sessions = new SessionInfo[connsPerLoop];
+        for (int i = 0; i < connsPerLoop; i++) {
+            sessions[i] = new SessionInfo(
+                    new SocketAddress("1.1.1.1", i + 100),
+                    new SocketAddress(host, port)
+            );
+        }
+
+        int totalConn = 0;
+        long startTime = System.currentTimeMillis();
+        final List<ChannelContext> contexts = new ArrayList<>();
+        for (int i = 0; i < loops; i++) {
+            CountDownLatch activeChannels = new CountDownLatch(connsPerLoop * threadsNum);
+            CountDownLatch inactiveChannels = new CountDownLatch(connsPerLoop * threadsNum);
+            EmitterHandler handler = new EmitterHandler() {
+
+                @Override
+                public void channelActive(ChannelContext context) throws Exception {
+                    if (context != null) {
+                        contexts.add(context);
+                    } else {
+                        System.out.println("Null context.");
+                    }
+                    activeChannels.countDown();
+                }
+
+                @Override
+                public void channelInactive(ChannelContext context) throws Exception {
+                    inactiveChannels.countDown();
+                }
+
+                @Override
+                public void errorOccured(ChannelContext context, Throwable cause) throws Exception {
+                    cause.printStackTrace();
+                }
+
+            };
+
+            for (int j = 0; j < threadsNum; j++) {
+                NioEmitterWorkerNoExecutors worker = workers[j];
+                for (int k = 0; k < connsPerLoop; k++) {
+                    worker.connect(sessions[k], handler);
+                }
+            }
+
+            activeChannels.await();
+            totalConn += contexts.size();
+
+            for (ChannelContext context : contexts) {
+                if (context != null) {
+                    try {
+                        context.close();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+
+            inactiveChannels.await();
+
+            contexts.clear();
+        }
+
+        System.out.println("time: " + (System.currentTimeMillis() - startTime) + " conns: " + totalConn);
+        for (int i = 0; i < threadsNum; i++) {
+            workers[i].setWorking(false);
+        }
+    }
+
+    public class SocketThread extends Thread {
+
+        private final Socket socket;
+
+        private final InetSocketAddress remote;
+
+        public SocketThread(InetSocketAddress remote) {
+            this.socket = new Socket();
+            this.remote = remote;
+        }
+
+        @Override
+        public void run() {
+            try {
+                socket.setReuseAddress(reuseAddress);
+                socket.setTcpNoDelay(true);
+                socket.connect(remote);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+
+    public static class SocketBunchThread extends Thread {
+
+        private static final Logger LOGGER = LogManager.getLogger(SocketBunchThread.class);
+
+        private int loops = 40;
+
+        private int connNum = 100;
+
+        private int totalConn = 0;
+
+        private long duration = 0;
+
+        boolean reuseAddress = false;
+
+        private InetSocketAddress remote;
+
+        @Override
+        public void run() {
+            try {
+                long startTime = System.currentTimeMillis();
+                Socket[] sockets = new Socket[connNum];
+                for (int i = 0; i < loops; i++) {
+                    for (int j = 0; j < connNum; j++) {
+                        LOGGER.debug("Connecting to '{}.'", remote);
+                        Socket socket = new Socket();
+                        socket.setReuseAddress(reuseAddress);
+                        socket.setTcpNoDelay(true);
+                        socket.connect(remote);
+
+                        sockets[j] = socket;
+                        totalConn++;
+                        LOGGER.debug("Connected to '{}.'", remote);
+                    }
+
+                    for (int j = 0; j < connNum; j++) {
+                        Socket socket = sockets[j];
+                        LOGGER.debug("Closing session '{}<->{}.'", socket.getLocalAddress(), remote);
+                        sockets[j].close();
+                        LOGGER.debug("Closed session '{}<->{}.'", socket.getLocalAddress(), remote);
+                    }
+                }
+
+                duration = System.currentTimeMillis() - startTime;
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        }
+
+    }
+
+    public static class NioThread extends Thread {
+
+        private static final Logger LOGGER = LogManager.getLogger(NioThread.class);
+
+        private int loops = 40;
+
+        private int connNum = 100;
+
+        private int totalConn = 0;
+
+        private long duration = 0;
+
+        boolean reuseAddress = false;
+
+        private InetSocketAddress remote;
+
+        @Override
+        public void run() {
+            try {
+                Selector selector = Selector.open();
+
+                long startTime = System.currentTimeMillis();
+                SocketChannel[] channels = new SocketChannel[connNum];
+                for (int i = 0; i < loops; i++) {
+                    for (int j = 0; j < connNum; j++) {
+                        LOGGER.debug("Registering", remote);
+                        SocketChannel channel = SocketChannel.open();
+                        channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
+                        channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
+                        channel.configureBlocking(false);
+
+                        channel.register(selector, SelectionKey.OP_CONNECT);
+                        try {
+                            channel.connect(remote);
+                        } catch (Exception ex) {
+                            ex.printStackTrace();
+                        }
+
+                        channels[j] = channel;
+                        LOGGER.debug("Registered", remote);
+                    }
+
+                    int establishedConn = 0;
+                    int selected = 0;
+                    long selectTimeout = 100;
+                    for (;;) {
+                        try {
+                            selected = selector.select(selectTimeout);
+                        } catch (IOException ex) {
+                            LOGGER.warn(ex.getMessage(), ex);
+                        }
+
+                        if (selected > 0) {
+                            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
+                            while (it.hasNext()) {
+                                SelectionKey key = it.next();
+                                it.remove();
+
+                                if (!key.isValid()) {
+                                    continue;
+                                }
+
+                                if (key.isConnectable()) {
+                                    LOGGER.debug("Connecting", remote);
+                                    SocketChannel channel = (SocketChannel) key.channel();
+
+                                    while (!channel.finishConnect()) {
+
+                                    }
+
+                                    key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
+                                    establishedConn++;
+                                    LOGGER.debug("Connected", remote);
+                                    totalConn++;
+                                    //System.out.println(establishedConn);
+                                }
+                            }
+                        }
+
+                        if (establishedConn == connNum) {
+                            LOGGER.debug("All connetions established.", remote);
+                            break;
+                        }
+                    }
+
+                    for (int j = 0; j < connNum; j++) {
+                        SocketChannel channel = channels[j];
+                        LOGGER.debug("Closing session.");
+                        channel.close();
+                        LOGGER.debug("Closed session.");
+                    }
+                }
+
+                duration = System.currentTimeMillis() - startTime;
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        }
+
+    }
+
+    public static class Nio2Thread extends Thread {
+
+        private static final Logger LOGGER = LogManager.getLogger(Nio2Thread.class);
+
+        private int loops = 40;
+
+        private int connNum = 100;
+
+        private int totalConn = 0;
+
+        private long duration = 0;
+
+        boolean reuseAddress = false;
+
+        private InetSocketAddress remote;
+
+        private Selector selector;
+
+        private SelectorThread selectorThread;
+
+        private final AtomicBoolean wakenUp = new AtomicBoolean();
+
+        private final Object lock = new Object();
+        private final Object selectorLock1 = new Object();
+        private final Object selectorLock2 = new Object();
+
+        private volatile boolean allConns = false;
+
+        public Nio2Thread() {
+            try {
+                selector = Selector.open();
+                selectorThread = new SelectorThread(selector);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        }
+
+        private void wakeUp() {
+            if (wakenUp.compareAndSet(false, true)) {
+                selector.wakeup();
+            }
+        }
+
+        @Override
+        public void run() {
+            try {
+                selectorThread.start();
+
+                long startTime = System.currentTimeMillis();
+                SocketChannel[] channels = new SocketChannel[connNum];
+                for (int i = 0; i < loops; i++) {
+                    allConns = false;
+                    for (int j = 0; j < connNum; j++) {
+                        LOGGER.debug("SocketChannel.open");
+                        SocketChannel channel = SocketChannel.open();
+                        channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
+                        channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
+                        channel.configureBlocking(false);
+                        channels[j] = channel;
+
+                        LOGGER.debug("Registering");
+                        synchronized (selectorLock2) {
+                            wakeUp();
+                            synchronized (selectorLock1) {
+                                channel.register(selector, SelectionKey.OP_CONNECT);
+                            }
+                            try {
+                                LOGGER.debug("channel.connect");
+                                channel.connect(remote);
+                            } catch (Exception ex) {
+                                ex.printStackTrace();
+                            }
+                        }
+
+                        LOGGER.debug("Registered", remote);
+                    }
+
+                    while (!allConns) {
+                        synchronized (lock) {
+                            lock.wait(50);
+                        }
+                    }
+
+                    for (int j = 0; j < connNum; j++) {
+                        SocketChannel channel = channels[j];
+                        LOGGER.debug("Closing session.");
+                        channel.close();
+                        LOGGER.debug("Closed session.");
+                    }
+
+                }
+
+                selectorThread.working = false;
+                selectorThread.selector.wakeup();
+                selectorThread.join();
+                duration = System.currentTimeMillis() - startTime;
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        }
+
+        private class SelectorThread extends Thread {
+
+            private final Selector selector;
+
+            private volatile boolean working = true;
+
+            public SelectorThread(Selector selector) {
+                this.selector = selector;
+            }
+
+            @Override
+            public void run() {
+                try {
+                    int establishedConn = 0;
+                    int selected = 0;
+                    long selectTimeout = 10;
+                    while (working) {
+                        synchronized (selectorLock1) {
+                            try {
+                                wakenUp.set(false);
+                                selected = selector.select(selectTimeout);
+                            } catch (IOException ex) {
+                                LOGGER.warn(ex.getMessage(), ex);
+                            }
+                        }
+
+                        synchronized (selectorLock2) {
+                            if (selected > 0) {
+                                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
+                                while (it.hasNext()) {
+                                    SelectionKey key = it.next();
+                                    it.remove();
+
+                                    if (!key.isValid()) {
+                                        continue;
+                                    }
+
+                                    if (key.isConnectable()) {
+                                        LOGGER.debug("Connecting");
+                                        SocketChannel channel = (SocketChannel) key.channel();
+
+                                        try {
+                                            while (!channel.finishConnect()) {
+
+                                            }
+
+                                            key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
+                                            establishedConn++;
+                                            LOGGER.debug("Connected");
+                                            totalConn++;
+                                        } catch (Exception e) {
+                                            key.cancel();
+                                            e.printStackTrace();
+                                        }
+
+                                        //System.out.println(establishedConn);
+                                    }
+                                }
+                            }
+                        }
+
+                        if (establishedConn >= connNum) {
+                            LOGGER.debug("All connections established.", remote);
+                            synchronized (lock) {
+                                allConns = true;
+                                lock.notifyAll();
+                            }
+
+                            establishedConn = 0;
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java	Mon Oct 09 10:02:28 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java	Fri Oct 13 07:34:51 2017 +0200
@@ -28,9 +28,9 @@
 
     public static final String TYPE = "asynch";
 
-    private long waitTimeout = 100;
+    private long waitTimeout = 10;
 
-    private long windowPeriod = 10;
+    private long windowPeriod = 1_000;
 
     private final Queue<TasksTimeWindow> windows = new ConcurrentLinkedQueue();
 
@@ -76,6 +76,15 @@
     }
 
     @Override
+    public void close() {
+        super.close();
+
+        synchronized (lock) {
+            lock.notifyAll();
+        }
+    }
+
+    @Override
     protected void closeAllConnections() {
         if (logger.isDebugEnabled()) {
             logger.debug("Closing all connections.");
@@ -85,8 +94,9 @@
             boolean wait;
             do {
                 wait = false;
+                //processTimeouts(true);
                 for (HttpFlowContext flowContext : sessions.values()) {
-                    if (flowContext.state == HttpFlowContext.STATE_REQ_SENT) {
+                    if (flowContext.state < HttpFlowContext.STATE_DISCONNECTED) {
                         wait = true;
                         break;
                     }
@@ -180,7 +190,7 @@
 
     private void addEvent(Event event, TasksTimeWindow window) {
         switch (event.getType()) {
-            case HttpSessionPayloadEvent.TYPE: {
+            /*case HttpSessionPayloadEvent.TYPE: {
                 Event newEvent = eventInstanceForWorker(event);
                 long time = newEvent.getTimestamp();
                 HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) newEvent;
@@ -199,7 +209,7 @@
                 }
 
                 break;
-            }
+            }*/
             case SessionStatusEvent.TYPE: {
                 Event newEvent = eventInstanceForWorker(event);
                 SessionStatusEvent statusEvent = (SessionStatusEvent) newEvent;
@@ -222,6 +232,7 @@
 
     @Override
     public void handle(Event event) {
+        //System.out.println(event.getTimestamp());
         synchronized (readerLock) {
             while (processWindow) {
                 try {
@@ -232,16 +243,16 @@
             }
         }
 
+        TasksTimeWindow window = getWindow(event.getTimestamp(), true);
+        if (currentWindow == null) {
+            currentWindow = windows.peek();
+        } else if (window != currentWindow) {
+            currentWindow = windows.peek();
+            processWindow = true;
+        }
+
+        addEvent(event, window);
         synchronized (lock) {
-            TasksTimeWindow window = getWindow(event.getTimestamp(), true);
-            if (currentWindow == null) {
-                currentWindow = window;
-            } else if (window != currentWindow) {
-                currentWindow = windows.peek();
-                processWindow = true;
-            }
-
-            addEvent(event, window);
             if (processWindow) {
                 lock.notifyAll();
             }
@@ -258,28 +269,44 @@
                     if (flowContext == null) {
                         connect(statusEvent);
                     } else {
-                        switch (flowContext.state()) {
-                            case HttpFlowContext.STATE_RESP_RECEIVED:
-                            case HttpFlowContext.STATE_CONNECTED:
-                            case HttpFlowContext.STATE_ERROR:
-                                close(statusEvent);
-                                return false;
-                            case HttpFlowContext.STATE_DISCONNECTED:
-                                connect(statusEvent);
+                        synchronized (flowContext) {
+                            if (processTimeout(flowContext)) {
                                 return true;
-                            default:
-                                return false;
+                            }
+
+                            switch (flowContext.state()) {
+                                case HttpFlowContext.STATE_RESP_RECEIVED:
+                                case HttpFlowContext.STATE_CONNECTED:
+                                case HttpFlowContext.STATE_ERROR:
+                                    closeSession(statusEvent);
+                                    return false;
+                                case HttpFlowContext.STATE_DISCONNECTED:
+                                    connect(statusEvent);
+                                    return true;
+                                default:
+                                    return false;
+                            }
                         }
                     }
                 } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
                     HttpFlowContext flowContext = flowContext(statusEvent);
                     if (flowContext == null) {
                         return true;
-                    } else if (flowContext.state != HttpFlowContext.STATE_REQ_SENT) {
-                        close(statusEvent);
-                        return true;
-                    } else {
-                        return false;
+                    }
+
+                    synchronized (flowContext) {
+                        if (processTimeout(flowContext)) {
+                            return true;
+                        }
+
+                        if (flowContext.state != HttpFlowContext.STATE_REQ_SENT
+                                && flowContext.state != HttpFlowContext.STATE_CONNECTING
+                                && flowContext.state != HttpFlowContext.STATE_DISCONNECTING) {
+                            closeSession(statusEvent);
+                            return true;
+                        } else {
+                            return false;
+                        }
                     }
                 }
 
@@ -289,26 +316,32 @@
                 SessionEvent sessEvent = (SessionEvent) event;
                 HttpFlowContext flowContext = flowContext(sessEvent);
                 if (flowContext != null) {
-                    switch (flowContext.state) {
-                        case HttpFlowContext.STATE_CONNECTING:
-                        case HttpFlowContext.STATE_REQ_SENT:
-                            return false;
-                        case HttpFlowContext.STATE_CONNECTED:
-                        case HttpFlowContext.STATE_RESP_RECEIVED:
-                        case HttpFlowContext.STATE_ERROR:
-                            if (send(flowContext, (HttpSessionPayloadEvent) event)) {
-                                return true;
-                            }
+                    synchronized (flowContext) {
+                        if (processTimeout(flowContext)) {
+                            return true;
+                        }
 
-                            return false;
-                        case HttpFlowContext.STATE_DISCONNECTING:
-                        case HttpFlowContext.STATE_DISCONNECTED:
-                            if (connectPartialSession) {
-                                connect(sessEvent);
-                            } else {
-                                return true;
-                            }
-                            break;
+                        switch (flowContext.state) {
+                            case HttpFlowContext.STATE_CONNECTING:
+                            case HttpFlowContext.STATE_REQ_SENT:
+                                return false;
+                            case HttpFlowContext.STATE_CONNECTED:
+                            case HttpFlowContext.STATE_RESP_RECEIVED:
+                            case HttpFlowContext.STATE_ERROR:
+                                if (send(flowContext, (HttpSessionPayloadEvent) event)) {
+                                    return true;
+                                }
+
+                                return false;
+                            case HttpFlowContext.STATE_DISCONNECTING:
+                            case HttpFlowContext.STATE_DISCONNECTED:
+                                if (connectPartialSession) {
+                                    connect(sessEvent);
+                                } else {
+                                    return true;
+                                }
+                                break;
+                        }
                     }
                 } else if (connectPartialSession) {
                     connect(sessEvent);
@@ -320,8 +353,14 @@
                 SessionEvent sessEvent = (SessionEvent) event;
                 HttpFlowContext flowContext = flowContext(sessEvent);
                 if (flowContext != null) {
-                    return (flowContext.state == HttpFlowContext.STATE_RESP_RECEIVED
-                            || flowContext.state >= HttpFlowContext.STATE_DISCONNECTING);
+                    if (processTimeout(flowContext)) {
+                        return true;
+                    }
+
+                    synchronized (flowContext) {
+                        return (flowContext.state == HttpFlowContext.STATE_RESP_RECEIVED
+                                || flowContext.state >= HttpFlowContext.STATE_DISCONNECTING);
+                    }
                 }
 
                 return true;
@@ -333,18 +372,31 @@
 
     @Override
     public void run() {
-
-        synchronized (lock) {
-            working = true;
-            while (working) {
+        working = true;
+        while (working) {
+            synchronized (lock) {
                 try {
                     lock.wait();
+                } catch (InterruptedException ignore) {
+                }
+            }
 
-                    boolean dataLoopEnd = false;
-                    boolean dataEnd = false;
+            if (processWindow) {
+                long startTime = System.currentTimeMillis();
+                int tasksNum = -1;
 
+                boolean dataLoopEnd = false;
+                boolean dataEnd = false;
+
+                try {
+
+                    logger.info("processing: " + currentWindow);
                     for (;;) {
                         Iterator<Task> it = currentWindow.tasks.iterator();
+                        if (tasksNum == -1) {
+                            tasksNum = currentWindow.tasks.size();
+                        }
+
                         while (it.hasNext()) {
                             Task task = it.next();
                             switch (task.type()) {
@@ -385,11 +437,12 @@
                             break;
                         } else if (!flowStateChanged) {
                             flowStateChanged = false;
-                            try {
-                                lock.wait(waitTimeout);
-                            } catch (InterruptedException ignore) {
+                            synchronized (lock) {
+                                try {
+                                    lock.wait(waitTimeout);
+                                } catch (InterruptedException ignore) {
+                                }
                             }
-
                         } else {
                             flowStateChanged = false;
                         }
@@ -399,19 +452,27 @@
                         closeAllConnections();
                     }
 
-                    if (dataEnd) {
-                        working = false;
-                    }
                 } catch (Exception e) {
                     if (logger.isDebugEnabled()) {
                         logger.debug(e.getMessage(), e);
                     }
                 } finally {
+                    long endTime = System.currentTimeMillis() - startTime;
+                    logger.info("{} tasks processed in {} ms.", tasksNum, endTime);
+
+                    //System.out.println("remove window: " + currentWindow);
                     removeWindow(currentWindow);
+
+                    if (dataEnd) {
+                        working = false;
+                    }
+
+                    currentWindow = null;
                     processWindow = false;
                     synchronized (readerLock) {
                         readerLock.notifyAll();
                     }
+
                 }
             }
         }
@@ -559,13 +620,6 @@
         }
 
         @Override
-        public String toString() {
-            return "TimeWindow{"
-                    + "startTimestamp=" + startTime
-                    + ", endTimestamp=" + endTime + '}';
-        }
-
-        @Override
         public int hashCode() {
             int hash = 5;
             hash = 17 * hash + (int) (this.startTime ^ (this.startTime >>> 32));
@@ -586,5 +640,13 @@
                     && this.endTime == other.endTime;
         }
 
+        @Override
+        public String toString() {
+            return "TimeWindow{"
+                    + "startTimestamp=" + startTime
+                    + ", endTimestamp=" + endTime
+                    + ", tasks=" + tasks.size() + '}';
+        }
+
     }
 }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java	Mon Oct 09 10:02:28 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java	Fri Oct 13 07:34:51 2017 +0200
@@ -42,10 +42,10 @@
     static {
         Map<Integer, Long> defaultTimeouts = new HashMap<>();
         defaultTimeouts.put(HttpFlowContext.STATE_CONNECTING, 10_000L);
-        defaultTimeouts.put(HttpFlowContext.STATE_CONNECTED, 60_000L);
-        defaultTimeouts.put(HttpFlowContext.STATE_REQ_SENT, 30_000L);
-        defaultTimeouts.put(HttpFlowContext.STATE_RESP_RECEIVED, 60_000L);
-        defaultTimeouts.put(HttpFlowContext.STATE_ERROR, 60_000L);
+        defaultTimeouts.put(HttpFlowContext.STATE_CONNECTED, 10_000L);
+        defaultTimeouts.put(HttpFlowContext.STATE_REQ_SENT, 10_000L);
+        defaultTimeouts.put(HttpFlowContext.STATE_RESP_RECEIVED, 10_000L);
+        defaultTimeouts.put(HttpFlowContext.STATE_ERROR, 1_000L);
         defaultTimeouts.put(HttpFlowContext.STATE_DISCONNECTING, 2_000L);
         defaultTimeouts.put(HttpFlowContext.STATE_DISCONNECTED, 0L);
         DEFAULT_TIMEOUTS = Collections.unmodifiableMap(defaultTimeouts);
@@ -251,18 +251,22 @@
     }
 
     protected HttpFlowContext connect(SessionInfo session) {
-        synchronized (lock) {
-            try {
-                HttpFlowContext flowContext = register(session);
-                if (flowContext != null) {
+        try {
+            HttpFlowContext flowContext = register(session);
+            if (flowContext != null) {
+                if (logger.isDebugEnabled()) {
+                    debug(flowContext, "Connecting.");
+                }
+
+                synchronized (flowContext) {
                     emitter.connect(session, this, index);
                     return flowContext;
                 }
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
             }
-            return null;
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
         }
+        return null;
     }
 
     @Override
@@ -271,7 +275,7 @@
             for (Map.Entry<SessionInfo, HttpFlowContext> entry : sessions.entrySet()) {
                 HttpFlowContext flowContext = entry.getValue();
                 try {
-                    closeSession(flowContext);
+                    HttpFlowBasedClientWorker.this.closeSession(flowContext);
                 } catch (Exception e) {
                     if (logger.isDebugEnabled()) {
                         debug(flowContext, e.getMessage(), e);
@@ -284,53 +288,41 @@
         }
     }
 
-    protected void close(SessionEvent sessionEvent) {
+    @Override
+    public void close(SessionInfo session) {
+        /*try {
+            HttpFlowContext flowContext = flowContext(session);
+            if (flowContext != null) {
+                synchronized (flowContext) {
+                    closeSession(flowContext);
+                }
+            }
+        } catch (Exception e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug(e.getMessage(), e);
+            }
+        }*/
+    }
+
+    protected void closeSession(SessionEvent sessionEvent) {
         close(sessionEvent.getSessionInfo());
     }
 
-    protected void close(HttpFlowContext flowContext) {
-        synchronized (lock) {
-            try {
-                closeSession(flowContext);
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug(e.getMessage(), e);
-                }
-            }
-        }
-    }
-
-    @Override
-    public void close(SessionInfo session) {
-        synchronized (lock) {
-            try {
-                HttpFlowContext flowContext = flowContext(session);
-                closeSession(flowContext);
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug(e.getMessage(), e);
-                }
-            }
-        }
-    }
-
     protected void closeSession(HttpFlowContext flowContext) {
-        synchronized (lock) {
-            if (flowContext != null) {
+        if (flowContext != null) {
+            synchronized (flowContext) {
                 changeFlowState(flowContext, HttpFlowContext.STATE_DISCONNECTING);
             }
         }
     }
 
     protected void removeFlowContext(HttpFlowContext flowContext) {
-        synchronized (lock) {
-            debug(flowContext, "removeFlowContext");
-            sessions.remove(flowContext.sessionInfo());
-        }
+        debug(flowContext, "removeFlowContext");
+        sessions.remove(flowContext.sessionInfo());
     }
 
     protected void reconnect(HttpFlowContext flowContext) {
-        synchronized (lock) {
+        synchronized (flowContext) {
             try {
                 if (logger.isDebugEnabled()) {
                     debug(flowContext, "Reconnect (state: {}).", flowContext.stateString());
@@ -346,10 +338,8 @@
     }
 
     protected void closeAllConnections() {
-        synchronized (lock) {
-            for (HttpFlowContext flowContext : sessions.values()) {
-                closeSession(flowContext);
-            }
+        for (HttpFlowContext flowContext : sessions.values()) {
+            HttpFlowBasedClientWorker.this.closeSession(flowContext);
         }
     }
 
@@ -387,44 +377,54 @@
 
     @Override
     public void channelActive(ChannelContext context) throws Exception {
-        synchronized (lock) {
-            HttpFlowContext flowContext = flowContext(context);
-            if (flowContext != null) {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})",
-                            context.getLocalAddress(),
-                            context.getRemoteAddress());
-                }
+        HttpFlowContext flowContext = flowContext(context);
+        if (flowContext != null) {
+            if (logger.isDebugEnabled()) {
+                debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})",
+                        context.getLocalAddress(),
+                        context.getRemoteAddress());
+            }
 
+            synchronized (flowContext) {
                 flowContext.channelContext = context;
                 changeFlowState(flowContext, STATE_CONNECTED);
             }
+        }
 
+        synchronized (lock) {
             lock.notifyAll();
         }
     }
 
     @Override
     public void channelInactive(ChannelContext context) throws Exception {
-        synchronized (lock) {
-            HttpFlowContext flowContext = flowContext(context);
-            if (flowContext != null) {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, "Channel inactive.");
-                }
+        HttpFlowContext flowContext = flowContext(context);
+        if (flowContext != null) {
+            if (logger.isDebugEnabled()) {
+                debug(flowContext, "Channel inactive.");
+            }
 
+            synchronized (flowContext) {
                 changeFlowState(flowContext, STATE_DISCONNECTED);
             }
+        }
+
+        synchronized (lock) {
             lock.notifyAll();
         }
     }
 
     @Override
     public void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
-        synchronized (lock) {
-            HttpFlowContext flowContext = flowContext(context);
-            try {
-                if (flowContext != null) {
+        HttpFlowContext flowContext = flowContext(context);
+        try {
+            if (flowContext != null) {
+                synchronized (flowContext) {
+                    if (flowContext.state() >= HttpFlowContext.STATE_DISCONNECTING) {
+                        debug(flowContext, "Data received but flow context in state '{}'.", flowContext.stateString());
+                        return;
+                    }
+
                     HttpFullMessageDecoder decoder = flowContext.decoder;
                     HttpRequest req = null;
                     if (flowContext.sentEvent != null) {
@@ -496,43 +496,51 @@
                         changeFlowState(flowContext, HttpFlowContext.STATE_RESP_RECEIVED);
                     }
                 }
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, e.getMessage(), e);
-                }
             }
+        } catch (Exception e) {
+            if (logger.isDebugEnabled()) {
+                debug(flowContext, e.getMessage(), e);
+            }
+        }
 
+        synchronized (lock) {
             lock.notifyAll();
         }
     }
 
     @Override
     public void dataWriteStart(ChannelContext context) {
-        synchronized (lock) {
-            HttpFlowContext flowContext = flowContext(context);
-            if (flowContext != null && flowContext.sentEvent != null) {
-                long now = timeGenerator.currentTimeMillis();
+        HttpFlowContext flowContext = flowContext(context);
+        if (flowContext != null && flowContext.sentEvent != null) {
+            long now = timeGenerator.currentTimeMillis();
+            synchronized (flowContext) {
                 flowContext.sendStartTimestamp = now;
                 flowContext.sentEvent.getRequest().setTag(TAG_TIME_START, now);
             }
         }
+
     }
 
     @Override
     public void dataWritten(ChannelContext context) throws Exception {
+        HttpFlowContext flowContext = flowContext(context);
+
+        if (flowContext != null) {
+            synchronized (flowContext) {
+                if (flowContext.sentEvent != null) {
+                    long now = timeGenerator.currentTimeMillis();
+                    if (collectMetric) {
+                        synchronized (metric) {
+                            metric.addRequestSendingTime(now - flowContext.sendStartTimestamp);
+                        }
+                    }
+
+                    flowContext.sentEvent.getRequest().setTag(TAG_TIME_END, now);
+                }
+            }
+        }
+
         synchronized (lock) {
-            HttpFlowContext flowContext = flowContext(context);
-            if (flowContext != null && flowContext.sentEvent != null) {
-                long now = timeGenerator.currentTimeMillis();
-                if (collectMetric) {
-                    synchronized (metric) {
-                        metric.addRequestSendingTime(now - flowContext.sendStartTimestamp);
-                    }
-                }
-
-                flowContext.sentEvent.getRequest().setTag(TAG_TIME_END, now);
-            }
-
             lock.notifyAll();
 
         }
@@ -544,99 +552,123 @@
             logger.debug("Error occured. " + cause.getMessage(), cause);
         }
 
+        HttpFlowContext flowContext = flowContext(context);
+        if (flowContext != null) {
+            synchronized (flowContext) {
+                closeSession(flowContext);
+            }
+        }
+
         synchronized (lock) {
-            HttpFlowContext flowContext = flowContext(context);
-            if (flowContext != null) {
-                changeFlowState(flowContext, HttpFlowContext.STATE_ERROR);
-            }
-
             lock.notifyAll();
         }
     }
 
-    protected boolean send(HttpFlowContext context, HttpSessionPayloadEvent event) {
-        synchronized (lock) {
+    protected boolean send(HttpFlowContext flowContext, HttpSessionPayloadEvent event) {
+        int reqSize = -1;
+        boolean res = false;
+        synchronized (flowContext) {
+            if (flowContext.state() < HttpFlowContext.STATE_CONNECTED
+                    || flowContext.state() >= HttpFlowContext.STATE_DISCONNECTING) {
+                debug(flowContext, "Data sending attempt but flow context in state '{}'.", flowContext.stateString());
+                return false;
+            }
+
             if (event.getRequest() != null) {
                 HttpRequest req = event.getRequest();
-                if (filterChain.filterOutbound(req, event.getResponse(), context) == HttpFilter.DENY) {
+                if (filterChain.filterOutbound(req, event.getResponse(), flowContext) == HttpFilter.DENY) {
                     return false;
                 }
 
-                reqEncoder.encodeHeader(req, context.buffer);
-                long headerSize = context.buffer.readableBytes();
-                reqEncoder.encodeContent(req, context.buffer);
+                reqEncoder.encodeHeader(req, flowContext.buffer);
+                long headerSize = flowContext.buffer.readableBytes();
+                reqEncoder.encodeContent(req, flowContext.buffer);
 
                 req.setTag(TAG_HEADER_SIZE, headerSize);
-                req.setTag(TAG_CONTENT_SIZE, (long) (context.buffer.readableBytes() - headerSize));
+                req.setTag(TAG_CONTENT_SIZE, (long) (flowContext.buffer.readableBytes() - headerSize));
 
-                if (collectMetric) {
-                    synchronized (metric) {
-                        metric.incRequestsNum();
-                        metric.addRequestSize(context.buffer.readableBytes());
-                        //metric.addRequestUrl(req.getUrl());
-                    }
-                }
+                reqSize = flowContext.buffer.readableBytes();
 
                 try {
-                    changeFlowState(context, HttpFlowContext.STATE_REQ_SENT);
-                    context.sentEvent = event;
+                    changeFlowState(flowContext, HttpFlowContext.STATE_REQ_SENT);
+                    flowContext.sentEvent = event;
 
-                    context.channelContext.writeAndFlush(context.buffer);
+                    flowContext.channelContext.writeAndFlush(flowContext.buffer);
                     if (logger.isDebugEnabled()) {
-                        debug(context, "Request '{}' sending ({} bytes).", req.getUrl(), context.buffer.length());
+                        debug(flowContext, "Request '{}' sending ({} bytes).", req.getUrl(), flowContext.buffer.length());
                     }
 
-                    context.buffer.clear();
+                    flowContext.buffer.clear();
 
-                    return true;
+                    res = true;
                 } catch (Exception e) {
                     if (logger.isDebugEnabled()) {
-                        debug(context, e.getMessage(), e);
+                        debug(flowContext, e.getMessage(), e);
                     }
                 }
             }
         }
 
-        return false;
+        if (collectMetric && reqSize >= 0) {
+            synchronized (metric) {
+                metric.incRequestsNum();
+                metric.addRequestSize(reqSize);
+                //metric.addRequestUrl(req.getUrl());
+            }
+        }
+
+        return res;
     }
 
     protected void processTimeouts() {
-        synchronized (lock) {
-            try {
-                long now = timeGenerator.currentTimeMillis();
-                if (nextCheckTimeoutsTime == -1) {
-                    nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
-                } else if (nextCheckTimeoutsTime > now) {
-                    nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
-                    for (HttpFlowContext flowContext : sessions.values()) {
-                        if (flowContext.timeouted()) {
-                            if (logger.isDebugEnabled()) {
-                                debug(flowContext, "Flow for session '{}' timed out (state '{}').",
-                                        flowContext.sessionInfo(), flowContext.stateString());
-                            }
+        processTimeouts(false);
+    }
 
-                            switch (flowContext.state) {
-                                case HttpFlowContext.STATE_CONNECTING:
-                                case HttpFlowContext.STATE_CONNECTED:
-                                case HttpFlowContext.STATE_REQ_SENT:
-                                case HttpFlowContext.STATE_ERROR:
-                                    closeSession(flowContext);
-                                    break;
-                                case HttpFlowContext.STATE_RESP_RECEIVED:
-                                    //Dziwny blad nie powinien wystepowac
-                                    break;
-                                case HttpFlowContext.STATE_DISCONNECTING:
-                                case HttpFlowContext.STATE_DISCONNECTED:
-                                    removeFlowContext(flowContext);
-                                    break;
-                            }
-                        }
+    protected boolean processTimeout(HttpFlowContext flowContext) {
+        if (flowContext.timeouted()) {
+            info(flowContext, "Session timed out (state '{}').",
+                    flowContext.sessionInfo(), flowContext.stateString());
+            if (logger.isDebugEnabled()) {
+                debug(flowContext, "Session timed out (state '{}').",
+                        flowContext.sessionInfo(), flowContext.stateString());
+            }
+
+            switch (flowContext.state) {
+                case HttpFlowContext.STATE_CONNECTING:
+                case HttpFlowContext.STATE_CONNECTED:
+                case HttpFlowContext.STATE_REQ_SENT:
+                case HttpFlowContext.STATE_ERROR:
+                case HttpFlowContext.STATE_RESP_RECEIVED:
+                    closeSession(flowContext);
+                    break;
+                case HttpFlowContext.STATE_DISCONNECTING:
+                case HttpFlowContext.STATE_DISCONNECTED:
+                    removeFlowContext(flowContext);
+                    break;
+            }
+
+            return true;
+        }
+
+        return false;
+    }
+
+    protected void processTimeouts(boolean force) {
+        try {
+            long now = timeGenerator.currentTimeMillis();
+            if (nextCheckTimeoutsTime == -1 && !force) {
+                nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
+            } else if (nextCheckTimeoutsTime > now || force) {
+                nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
+                for (HttpFlowContext flowContext : sessions.values()) {
+                    synchronized (flowContext) {
+                        processTimeout(flowContext);
                     }
                 }
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug(e.getMessage(), e);
-                }
+            }
+        } catch (Exception e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug(e.getMessage(), e);
             }
         }
     }
@@ -659,6 +691,10 @@
         log(flowContext, Level.DEBUG, message, cause);
     }
 
+    protected final void info(HttpFlowContext flowContext, String message, Object... args) {
+        log(flowContext, Level.INFO, message, args);
+    }
+
     protected final void error(HttpFlowContext flowContext, String message, Throwable cause) {
         log(flowContext, Level.ERROR, message, cause);
     }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java	Mon Oct 09 10:02:28 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java	Fri Oct 13 07:34:51 2017 +0200
@@ -122,7 +122,7 @@
                 LocalHttpFlowContext indexFlowContext = it.next();
                 if (indexFlowContext.eventsQueue.isEmpty()
                         && indexFlowContext.state != HttpFlowContext.STATE_REQ_SENT) {
-                    close(flowContext);
+                    closeSession(flowContext);
                     if (--diff == 0) {
                         break;
                     }
@@ -154,7 +154,7 @@
             if (localFlowContext.state < HttpFlowContext.STATE_DISCONNECTING
                     && localFlowContext.state != HttpFlowContext.STATE_REQ_SENT
                     && localFlowContext.eventsQueue.isEmpty()) {
-                close(flowContext);
+                closeSession(flowContext);
                 return;
             }
         }
@@ -169,7 +169,7 @@
                 SessionStatusEvent statusEvent = (SessionStatusEvent) event;
                 if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
                     localFlowContext.eventsQueue.poll();
-                    close((SessionStatusEvent) event);
+                    closeSession((SessionStatusEvent) event);
                 }
             } else if (event.getType() == HttpSessionPayloadEvent.TYPE
                     && canSend(flowContext)) {
@@ -203,7 +203,7 @@
                         if (flowContext != null) {
                             if (flowContext.eventsQueue.isEmpty()
                                     && flowContext.state != HttpFlowContext.STATE_REQ_SENT) {
-                                close(statusEvent);
+                                closeSession(statusEvent);
                             } else {
                                 addToQueue(flowContext, event);
                             }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java	Mon Oct 09 10:02:28 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java	Fri Oct 13 07:34:51 2017 +0200
@@ -160,7 +160,7 @@
                         currFlowContext = flowContext((SessionEvent) event);
                         if (currFlowContext != null) {
                             if (currFlowContext.state != HttpFlowContext.STATE_REQ_SENT) {
-                                close(statusEvent);
+                                closeSession(statusEvent);
                             }
                         }
                     }
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Mon Oct 09 10:02:28 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Fri Oct 13 07:34:51 2017 +0200
@@ -67,7 +67,7 @@
         this.srcIp = srcIp;
         this.dstIp = dstIp;
         this.sessionStatus = 0;
-        this.hashCode = (srcPort + dstPort << 16) ^ srcIp.hashCode() ^ dstIp.hashCode();
+        this.hashCode = (srcPort + (dstPort << 16)) ^ srcIp.hashCode() ^ dstIp.hashCode();
     }
 
     public String getSourceName() {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContextNoExecutors.java	Fri Oct 13 07:34:51 2017 +0200
@@ -0,0 +1,108 @@
+package com.passus.st.emitter.nio;
+
+import com.passus.data.ByteBuff;
+import com.passus.net.SocketAddress;
+import com.passus.net.utils.AddressUtils;
+import com.passus.st.emitter.ChannelContext;
+import com.passus.st.emitter.SessionInfo;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class NioChannelContextNoExecutors implements ChannelContext {
+
+    private final NioEmitterWorkerNoExecutors worker;
+
+    private final SessionInfo sessionInfo;
+
+    private final SocketChannel channel;
+
+    private final Queue<ByteBuffer> dataQueue;
+
+    private SocketAddress localAddress;
+
+    private SocketAddress remoteAddress;
+
+    private SelectionKey key;
+
+    public NioChannelContextNoExecutors(NioEmitterWorkerNoExecutors worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) {
+        this.worker = worker;
+        this.channel = channel;
+        this.remoteAddress = remoteAddress;
+        this.sessionInfo = sessionInfo;
+        this.dataQueue = new LinkedList<>();
+
+    }
+
+    Queue<ByteBuffer> dataQueue() {
+        return dataQueue;
+    }
+
+    void selectionKey(SelectionKey key) {
+        this.key = key;
+    }
+
+    private void addToQeueu(ByteBuffer buffer) throws IOException {
+        dataQueue.add(buffer);
+    }
+
+    @Override
+    public boolean isConnected() {
+        return channel.isConnected();
+    }
+
+    @Override
+    public boolean isConnectionPending() {
+        return channel.isConnectionPending();
+    }
+
+    @Override
+    public void write(byte[] data, int offset, int length) throws IOException {
+        addToQeueu(ByteBuffer.wrap(data, offset, length));
+    }
+
+    @Override
+    public void write(ByteBuff data) throws IOException {
+        addToQeueu(data.toNioByteBuffer());
+    }
+
+    @Override
+    public void flush() {
+        worker.flush(key);
+    }
+
+    @Override
+    public void close() throws IOException {
+        worker.requestClose(key);
+    }
+
+    @Override
+    public SocketAddress getLocalAddress() {
+        try {
+            if (localAddress == null && channel.getLocalAddress() != null) {
+                localAddress = AddressUtils.jdkSocketToSocketAddress(channel.getLocalAddress());
+            }
+        } catch (Exception e) {
+        }
+
+        return localAddress;
+    }
+
+    @Override
+    public SocketAddress getRemoteAddress() {
+        return remoteAddress;
+    }
+
+    @Override
+    public SessionInfo getSessionInfo() {
+        return sessionInfo;
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Mon Oct 09 10:02:28 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Fri Oct 13 07:34:51 2017 +0200
@@ -55,7 +55,9 @@
 
     private EmitterMetric metric;
 
-    NioEmitterWorker(int index) throws IOException {
+    private ExecutorService exec = Executors.newSingleThreadExecutor();
+
+    public NioEmitterWorker(int index) throws IOException {
         super("NioEmitterWorker-" + index);
         selector = Selector.open();
     }
@@ -89,17 +91,6 @@
         this.sessionMapper = sessionMapper;
     }
 
-    public void setWorking(boolean working) {
-        this.working = working;
-        if (this.working && !working) {
-            try {
-                selector.close();
-            } catch (IOException ex) {
-                LOGGER.warn(ex.getMessage(), ex);
-            }
-        }
-    }
-
     @Override
     public boolean isCollectMetrics() {
         return collectMetrics;
@@ -135,6 +126,34 @@
         return working;
     }
 
+    public void setWorking(boolean working) {
+        if (this.working && !working) {
+            this.working = false;
+            try {
+                selector.close();
+            } catch (IOException ex) {
+                LOGGER.warn(ex.getMessage(), ex);
+            }
+
+            if (!tasks.isEmpty()) {
+                LOGGER.debug("{} tasks remains.", tasks.size());
+            }
+
+            tasks.clear();
+            if (exec != null) {
+                try {
+                    exec.shutdownNow();
+                    exec.awaitTermination(200, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                }
+            }
+
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Shutdowned.");
+            }
+        }
+    }
+
     @Override
     public void writeMetrics(MetricsContainer container) {
         if (collectMetrics) {
@@ -268,10 +287,18 @@
                 LOGGER.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress());
             }
 
-            keyContext.handler.channelActive(keyContext.channelContext);
+            exec.submit(() -> {
+                try {
+                    keyContext.handler.channelActive(keyContext.channelContext);
+                } catch (Exception e) {
+                    LOGGER.debug(e.getMessage(), e);
+                }
+
+            });
+
             setOpRead(key);
         } catch (Exception ex) {
-
+            LOGGER.debug(ex.getMessage(), ex);
         }
     }
 
@@ -283,7 +310,10 @@
                     keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
         }
 
-        keyContext.handler.dataWriteStart(keyContext.channelContext);
+        exec.submit(() -> {
+            keyContext.handler.dataWriteStart(keyContext.channelContext);
+        });
+
         Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue();
         int written = 0;
         try {
@@ -295,17 +325,7 @@
 
                     if (res == -1) {
                         if (LOGGER.isDebugEnabled()) {
-                            LOGGER.debug("Session ({} -> {}) closed by serwer.",
-                                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
-                        }
-
-                        if (LOGGER.isDebugEnabled()) {
-                            LOGGER.debug("Session ({} -> {}) closed by serwer.",
-                                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
-                        }
-
-                        if (LOGGER.isDebugEnabled()) {
-                            LOGGER.debug("Session ({} -> {}) closed by serwer.",
+                            LOGGER.debug("Session ({} -> {}) closed by server.",
                                     keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
                         }
 
@@ -332,12 +352,14 @@
         }
 
         //TODO Operacje na handlerach powinny przechodzic przez Executor
-        try {
-            keyContext.handler.dataWritten(keyContext.channelContext);
-            LOGGER.debug("Write handled.");
-        } catch (Exception e) {
-            LOGGER.debug(e.getMessage(), e);
-        }
+        exec.submit(() -> {
+            try {
+                keyContext.handler.dataWritten(keyContext.channelContext);
+                LOGGER.debug("Write handled.");
+            } catch (Exception e) {
+                LOGGER.debug(e.getMessage(), e);
+            }
+        });
 
         setOpRead(key);
         clearOpWrite(key);
@@ -382,27 +404,28 @@
         }
 
         if (totalReaded > 0) {
-            try {
-                keyContext.handler.dataReceived(keyContext.channelContext, buff);
-                LOGGER.debug("Read handled.");
-            } catch (Exception e) {
-                LOGGER.debug(e.getMessage(), e);
-            }
+            exec.submit(() -> {
+                try {
+                    keyContext.handler.dataReceived(keyContext.channelContext, buff);
+                    LOGGER.debug("Read handled.");
+                } catch (Exception e) {
+                    LOGGER.debug(e.getMessage(), e);
+                }
+            });
         }
 
         if (readed == -1) {
             if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Session ({} -> {}) closed by serwer.",
+                LOGGER.debug("Session ({} -> {}) closed by server.",
                         keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
             }
-        }
 
-        if (readed == -1) {
             doClose(key);
             return;
         }
 
         keyContext.buffer.flip();
+        key.selector().wakeup();
     }
 
     private void doCatchException(SelectionKey key, Throwable cause) {
@@ -417,43 +440,45 @@
         }
 
         KeyContext keyContext = (KeyContext) key.attachment();
-
-        try {
-            keyContext.handler.errorOccured(keyContext.channelContext, cause);
-        } catch (Exception e) {
-            LOGGER.debug(e.getMessage(), e);
-        }
+        exec.submit(() -> {
+            try {
+                keyContext.handler.errorOccured(keyContext.channelContext, cause);
+            } catch (Exception e) {
+                LOGGER.debug(e.getMessage(), e);
+            }
+        });
     }
 
     private void doClose(SelectionKey key) {
-        if (!key.channel().isOpen()) {
-            selector.wakeup();
-            return;
-        }
-
-        try {
-            key.channel().close();
-        } catch (Exception e) {
-            LOGGER.debug(e.getMessage(), e);
+        KeyContext keyContext = (KeyContext) key.attachment();
+        if (key.channel().isOpen()) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'.");
+            }
+            try {
+                key.channel().close();
+            } catch (Exception e) {
+                LOGGER.debug(e.getMessage(), e);
+            }
         }
 
-        KeyContext keyContext = (KeyContext) key.attachment();
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'.");
+        if (key.isValid()) {
+            key.cancel();
         }
 
-        try {
-            keyContext.handler.channelInactive(keyContext.channelContext);
-        } catch (Exception e) {
-            LOGGER.debug(e.getMessage(), e);
-        }
+        exec.submit(() -> {
+            try {
+                keyContext.handler.channelInactive(keyContext.channelContext);
+            } catch (Exception e) {
+                LOGGER.debug(e.getMessage(), e);
+            }
 
-        key.cancel();
-        try {
-            keyContext.handler.channelUnregistered(keyContext.channelContext);
-        } catch (Exception e) {
-            LOGGER.debug(e.getMessage(), e);
-        }
+            try {
+                keyContext.handler.channelUnregistered(keyContext.channelContext);
+            } catch (Exception e) {
+                LOGGER.debug(e.getMessage(), e);
+            }
+        });
 
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'.");
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerNoExecutors.java	Fri Oct 13 07:34:51 2017 +0200
@@ -0,0 +1,770 @@
+package com.passus.st.emitter.nio;
+
+import com.passus.st.emitter.SessionMapper;
+import com.passus.commons.Assert;
+import com.passus.data.ByteBuff;
+import com.passus.data.HeapByteBuff;
+import com.passus.net.SocketAddress;
+import com.passus.net.utils.AddressUtils;
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.EmitterMetric;
+import static com.passus.st.emitter.SessionMapper.ANY_SOCKET;
+import com.passus.st.emitter.SessionMapper.ConnectionParams;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.metric.MetricSource;
+import com.passus.st.metric.MetricsContainer;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class NioEmitterWorkerNoExecutors extends Thread implements MetricSource {
+
+    private static final Logger LOGGER = LogManager.getLogger(NioEmitterWorker.class);
+
+    private int index;
+
+    private final Selector selector;
+
+    private volatile boolean working = false;
+
+    private long selectTimeout = 100;
+
+    private Queue<Task> tasks = new ConcurrentLinkedQueue<>();
+
+    private SessionMapper sessionMapper;
+
+    private volatile boolean collectMetrics = false;
+
+    private long connectionTimeout = 5_000;
+
+    private EmitterMetric metric;
+
+    private boolean reusePort = true;
+
+    private boolean tcpNoDelay = true;
+
+    private final AtomicBoolean wakeUp = new AtomicBoolean();
+
+    public NioEmitterWorkerNoExecutors(int index) throws IOException {
+        super("NioEmitterWorker-" + index);
+        selector = Selector.open();
+    }
+
+    int getIndex() {
+        return index;
+    }
+
+    public boolean isReusePort() {
+        return reusePort;
+    }
+
+    public void setReusePort(boolean reusePort) {
+        this.reusePort = reusePort;
+    }
+
+    public boolean isTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    public void setTcpNoDelay(boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+    public long getSelectTimeout() {
+        return selectTimeout;
+    }
+
+    public void setSelectTimeout(long selectTimeout) {
+        Assert.greaterThanZero(selectTimeout, "selectTimeout");
+        this.selectTimeout = selectTimeout;
+    }
+
+    public long getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    public void setConnectionTimeout(long connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+    }
+
+    public SessionMapper getSessionMapper() {
+        return sessionMapper;
+    }
+
+    public void setSessionMapper(SessionMapper sessionMapper) {
+        this.sessionMapper = sessionMapper;
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        if (collectMetrics && metric == null) {
+            metric = new EmitterMetric();
+            try {
+                metric.activate();
+            } catch (Exception e) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("Error occured during metric activation. " + e.getMessage(), e);
+                }
+            }
+            this.collectMetrics = true;
+        } else if (!collectMetrics && metric != null) {
+            this.collectMetrics = false;
+            try {
+                metric.deactivate();
+            } catch (Exception e) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("Error occured during metric deactivation. " + e.getMessage(), e);
+                }
+            }
+
+            metric = null;
+        }
+    }
+
+    public boolean isWorking() {
+        return working;
+    }
+
+    public void setWorking(boolean working) {
+        if (this.working && !working) {
+            this.working = false;
+            tasks.clear();
+            wakeUp();
+
+            try {
+                selector.close();
+            } catch (IOException ex) {
+                LOGGER.warn(ex.getMessage(), ex);
+            }
+
+            interrupt();
+
+            try {
+                join();
+            } catch (Exception e) {
+            }
+
+        }
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+        if (collectMetrics) {
+            container.update(System.currentTimeMillis(), metric);
+            metric.reset();
+        }
+    }
+
+    public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException {
+        tasks.add(new ConnectTask(sessionInfo, handler));
+        selector.wakeup();
+    }
+
+    void flush(SelectionKey key) {
+        tasks.add(new FlushTask(key));
+        selector.wakeup();
+    }
+
+    private void doRegister(SessionInfo sessionInfo, EmitterHandler handler) {
+        try {
+            ConnectionParams connParams = sessionMapper.map(sessionInfo);
+            if (connParams == null) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("Unable to map session '" + sessionInfo + "'.");
+                }
+
+                try {
+                    handler.sessionInvalidated(sessionInfo);
+                } catch (Exception e) {
+                    LOGGER.debug(e.getMessage(), e);
+                }
+
+                return;
+            }
+
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Registering session '" + sessionInfo + "'. Mapped connection parameters '" + connParams + "'.");
+            }
+
+            SocketChannel channel = SocketChannel.open();
+            channel.setOption(StandardSocketOptions.SO_REUSEADDR, reusePort);
+            channel.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay);
+            channel.configureBlocking(false);
+
+            SocketAddress bindAddress = connParams.getBindAddress();
+            if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
+                channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress));
+            }
+
+            SocketAddress remoteAddress = connParams.getRemoteAddress();
+            if (remoteAddress == null) {
+                remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
+            }
+
+            NioChannelContextNoExecutors channelContext = new NioChannelContextNoExecutors(this, channel, remoteAddress, sessionInfo);
+            KeyContext keyContext = new KeyContext(channelContext, handler);
+            SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext);
+
+            try {
+                handler.channelRegistered(channelContext);
+            } catch (Exception ex) {
+                doCatchException(key, ex);
+            }
+
+            channelContext.selectionKey(key);
+            try {
+                channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress));
+            } catch (Exception ex) {
+                doCatchException(key, ex);
+                doClose(key);
+                return;
+            }
+
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Session registered '{}'.", sessionInfo);
+            }
+
+            wakeUp();
+        } catch (Exception e) {
+            if (collectMetrics) {
+                metric.errorCaught(e);
+            }
+            LOGGER.error(e.getMessage(), e);
+        }
+
+    }
+
+    private void doFinishConnect(SelectionKey key) {
+        SocketChannel channel = (SocketChannel) key.channel();
+        KeyContext keyContext = (KeyContext) key.attachment();
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Connecting to '{}'.", keyContext.channelContext.getRemoteAddress());
+        }
+
+        try {
+            long connStart = System.currentTimeMillis();
+            boolean timeouted = false;
+            while (!channel.finishConnect()) {
+                long now = System.currentTimeMillis();
+                if (now - connStart < connectionTimeout) {
+                    timeouted = true;
+                    break;
+                }
+            }
+
+            clearOpConnect(key);
+            if (timeouted) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("Connection '{}' timed out.", keyContext.channelContext.getRemoteAddress());
+                }
+
+                throw new ConnectException("Connection timed out.");
+
+            }
+        } catch (Exception e) {
+            doCatchException(key, e);
+            doClose(key);
+            if (collectMetrics) {
+                metric.incConnectionsErrors();
+            }
+
+            return;
+        }
+
+        try {
+            if (collectMetrics) {
+                metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress());
+                metric.addBindSocket(keyContext.channelContext.getLocalAddress());
+            }
+
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress());
+            }
+
+            keyContext.handler.channelActive(keyContext.channelContext);
+            setOpRead(key);
+        } catch (Exception ex) {
+            LOGGER.error(ex.getMessage(), ex);
+        }
+    }
+
+    private void doWrite(SelectionKey key) {
+        SocketChannel socketChannel = (SocketChannel) key.channel();
+        KeyContext keyContext = (KeyContext) key.attachment();
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Writing to '{}'.", keyContext.channelContext.getRemoteAddress());
+        }
+
+        Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue();
+        int written = 0;
+        try {
+            ByteBuffer buffer;
+            while (!queue.isEmpty()) {
+                buffer = queue.poll();
+                while (buffer.hasRemaining()) {
+                    int res = socketChannel.write(buffer);
+
+                    if (res == -1) {
+                        doClose(key);
+                        return;
+                    }
+
+                    written += res;
+                }
+            }
+        } catch (Exception e) {
+            doCatchException(key, e);
+            doClose(key);
+            return;
+        }
+
+        if (collectMetrics) {
+            metric.updateSentBytes(written);
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Written '" + written + "' to '" + keyContext.channelContext.getRemoteAddress() + "'.");
+        }
+
+        //TODO Operacje na handlerach powinny przechodzic przez Executor
+        try {
+            keyContext.handler.dataWritten(keyContext.channelContext);
+        } catch (Exception e) {
+            LOGGER.debug(e.getMessage(), e);
+        }
+
+        setOpRead(key);
+        clearOpWrite(key);
+    }
+
+    private int read(SocketChannel channel, ByteBuffer buffer, ByteBuff out) throws IOException {
+        int readed;
+        int totalReaded = 0;
+        while ((readed = channel.read(buffer)) > 0) {
+            buffer.flip();
+            out.append(buffer.array(), buffer.position(), buffer.limit());
+            buffer.clear();
+            if (readed < 0) {
+                return -1;
+            }
+
+            totalReaded += readed;
+        }
+
+        return totalReaded;
+    }
+
+    private void doRead(SelectionKey key) {
+        SocketChannel channel = (SocketChannel) key.channel();
+        KeyContext keyContext = (KeyContext) key.attachment();
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Reading from '" + keyContext.channelContext.getRemoteAddress() + "'.");
+        }
+
+        ByteBuffer buffer = keyContext.buffer;
+
+        keyContext.buffer.clear();
+
+        ByteBuff buff = new HeapByteBuff();
+        int totalReaded = 0;
+        int readed;
+        try {
+            while ((readed = channel.read(buffer)) > 0) {
+                buffer.flip();
+                buff.append(buffer.array(), buffer.position(), buffer.limit());
+                buffer.clear();
+                totalReaded += readed;
+            }
+        } catch (IOException e) {
+            doCatchException(key, e);
+            doClose(key);
+            return;
+        }
+
+        if (collectMetrics) {
+            metric.updateReceivedBytes(totalReaded);
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(totalReaded + " readed from '" + keyContext.channelContext.getRemoteAddress() + "'.");
+        }
+
+        try {
+            keyContext.handler.dataReceived(keyContext.channelContext, buff);
+        } catch (Exception e) {
+            LOGGER.debug(e.getMessage(), e);
+        }
+
+        if (readed == -1) {
+            doClose(key);
+            return;
+        }
+
+        keyContext.buffer.flip();
+    }
+
+    private void doCatchException(SelectionKey key, Throwable cause) {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Error occured. " + cause.getMessage(), cause);
+        }
+
+        if (collectMetrics) {
+            synchronized (metric) {
+                metric.errorCaught(cause);
+            }
+        }
+
+        KeyContext keyContext = (KeyContext) key.attachment();
+
+        try {
+            keyContext.handler.errorOccured(keyContext.channelContext, cause);
+        } catch (Exception e) {
+            LOGGER.debug(e.getMessage(), e);
+        }
+    }
+
+    private void doClose(SelectionKey key) {
+        KeyContext keyContext = (KeyContext) key.attachment();
+        if (key.channel().isOpen()) {
+            try {
+                key.channel().close();
+            } catch (Exception e) {
+                LOGGER.debug(e.getMessage(), e);
+            }
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'.");
+        }
+        try {
+            keyContext.handler.channelInactive(keyContext.channelContext);
+        } catch (Exception e) {
+            LOGGER.debug(e.getMessage(), e);
+        }
+
+        if (key.isValid()) {
+            key.cancel();
+        }
+
+        try {
+            keyContext.handler.channelUnregistered(keyContext.channelContext);
+        } catch (Exception e) {
+            LOGGER.debug(e.getMessage(), e);
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'.");
+        }
+
+        if (collectMetrics) {
+            metric.incClosedConnections();
+        }
+    }
+
+    private void sslDoHandshake(SocketChannel socketChannel, SSLEngine engine,
+            ByteBuffer myNetData, ByteBuffer peerNetData) throws Exception {
+
+        // Create byte buffers to use for holding application data
+        int appBufferSize = engine.getSession().getApplicationBufferSize();
+        ByteBuffer myAppData = ByteBuffer.allocate(appBufferSize);
+        ByteBuffer peerAppData = ByteBuffer.allocate(appBufferSize);
+
+        // Begin handshake
+        engine.beginHandshake();
+        SSLEngineResult.HandshakeStatus hs = engine.getHandshakeStatus();
+
+        // Process handshaking message
+        while (hs != SSLEngineResult.HandshakeStatus.FINISHED
+                && hs != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
+            switch (hs) {
+                case NEED_UNWRAP:
+                    // Receive handshaking data from peer
+                    if (socketChannel.read(peerNetData) < 0) {
+                        // Handle closed channel
+                    }
+
+                    // Process incoming handshaking data
+                    peerNetData.flip();
+                    SSLEngineResult res = engine.unwrap(peerNetData, peerAppData);
+                    peerNetData.compact();
+                    hs = res.getHandshakeStatus();
+
+                    switch (res.getStatus()) {
+                        case OK:
+                            // Handle OK status
+                            break;
+                        case BUFFER_UNDERFLOW:
+                            break;
+                        case BUFFER_OVERFLOW:
+                            break;
+                        case CLOSED:
+                            break;
+                        // Handle other status: BUFFER_UNDERFLOW, BUFFER_OVERFLOW, CLOSED
+                    }
+                    break;
+
+                case NEED_WRAP:
+                    // Empty the local network packet buffer.
+                    myNetData.clear();
+
+                    // Generate handshaking data
+                    res = engine.wrap(myAppData, myNetData);
+                    hs = res.getHandshakeStatus();
+
+                    // Check status
+                    switch (res.getStatus()) {
+                        case OK:
+                            myNetData.flip();
+
+                            // Send the handshaking data to peer
+                            while (myNetData.hasRemaining()) {
+                                if (socketChannel.write(myNetData) < 0) {
+                                    // Handle closed channel
+                                }
+                            }
+                            break;
+
+                        // Handle other status:  BUFFER_OVERFLOW, BUFFER_UNDERFLOW, CLOSED
+                    }
+                    break;
+
+                case NEED_TASK:
+                    // Handle blocking tasks
+                    break;
+
+                // Handle other status:  // FINISHED or NOT_HANDSHAKING
+            }
+        }
+        // Processes after handshaking
+    }
+
+    private void sslDoWrap(SocketChannel socketChannel) {
+
+    }
+
+    private void sslDoUnwrap(SocketChannel socketChannel, SSLEngine engine, ByteBuffer netData, ByteBuffer appData) {
+        SSLEngineResult res;
+        try {
+            res = engine.unwrap(netData, appData);
+        } catch (Exception e) {
+            return;
+        }
+
+        switch (res.getStatus()) {
+            case OK:
+                break;
+            case CLOSED:
+                break;
+            case BUFFER_OVERFLOW:
+                // Maybe need to enlarge the peer application data buffer.
+                if (engine.getSession().getApplicationBufferSize()
+                        > appData.capacity()) {
+                    // enlarge the peer application data buffer
+                } else {
+                    // compact or clear the buffer
+                }
+                // retry the operation
+                break;
+
+            case BUFFER_UNDERFLOW:
+                // Maybe need to enlarge the peer network packet buffer
+                if (engine.getSession().getPacketBufferSize()
+                        > netData.capacity()) {
+                    // enlarge the peer network packet buffer
+                } else {
+                    // compact or clear the buffer
+                }
+                // obtain more inbound network data and then retry the operation
+                break;
+        }
+    }
+
+    private void wakeUp() {
+        if (wakeUp.compareAndSet(false, true)) {
+            selector.wakeup();
+        }
+    }
+
+    private void clearOpConnect(SelectionKey key) {
+        if (!key.isValid() || !key.isConnectable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
+        key.selector().wakeup();
+    }
+
+    private void setOpRead(SelectionKey key) {
+        if (!key.isValid() || key.isReadable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+        wakeUp();
+    }
+
+    private void clearOpRead(SelectionKey key) {
+        if (!key.isValid() || !key.isReadable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
+    }
+
+    private void setOpWrite(SelectionKey key) {
+        if (!key.isValid() || key.isWritable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+        wakeUp();
+    }
+
+    private void clearOpWrite(SelectionKey key) {
+        if (!key.isValid() || !key.isWritable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+    }
+
+    void requestClose(SelectionKey key) {
+        tasks.add(new CloseTask(key));
+        wakeUp();
+    }
+
+    @Override
+    public void run() {
+        int selected = 0;
+        working = true;
+        while (working) {
+            if (!tasks.isEmpty()) {
+                Task task;
+                while ((task = tasks.poll()) != null) {
+                    if (task.code == Task.CLOSE) {
+                        doClose(((CloseTask) task).key);
+                    } else if (task.code == Task.CONNECT) {
+                        ConnectTask taskConn = (ConnectTask) task;
+                        doRegister(taskConn.sessionInfo, taskConn.handler);
+                    } else if (task.code == Task.FLUSH) {
+                        FlushTask flushTask = (FlushTask) task;
+                        setOpWrite(flushTask.key);
+                    }
+                }
+            }
+
+            try {
+                selected = selector.select(selectTimeout);
+            } catch (IOException ex) {
+                LOGGER.warn(ex.getMessage(), ex);
+            }
+
+            if (selected > 0) {
+                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
+                while (it.hasNext()) {
+                    SelectionKey key = it.next();
+                    it.remove();
+
+                    if (!key.isValid()) {
+                        continue;
+                    }
+
+                    if (key.isConnectable()) {
+                        doFinishConnect(key);
+                    } else if (key.isWritable()) {
+                        doWrite(key);
+                    } else if (key.isReadable()) {
+                        doRead(key);
+                    }
+                }
+            }
+        }
+    }
+
+    private static class KeyContext {
+
+        private final EmitterHandler handler;
+
+        private final NioChannelContextNoExecutors channelContext;
+
+        private ByteBuffer buffer = ByteBuffer.allocate(1024);
+
+        public KeyContext(NioChannelContextNoExecutors channelContext, EmitterHandler handler) {
+            this.channelContext = channelContext;
+            this.handler = handler;
+        }
+
+    }
+
+    private static abstract class Task {
+
+        public static final int CLOSE = 1;
+        public static final int CONNECT = 2;
+        public static final int FLUSH = 3;
+
+        private final int code;
+
+        public Task(int code) {
+            this.code = code;
+        }
+
+    }
+
+    private final static class ConnectTask extends Task {
+
+        private final SessionInfo sessionInfo;
+        private final EmitterHandler handler;
+
+        public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) {
+            super(CONNECT);
+            this.sessionInfo = sessionInfo;
+            this.handler = handler;
+        }
+
+    }
+
+    private final static class CloseTask extends Task {
+
+        private final SelectionKey key;
+
+        public CloseTask(SelectionKey key) {
+            super(CLOSE);
+            this.key = key;
+        }
+
+    }
+
+    private final static class FlushTask extends Task {
+
+        private final SelectionKey key;
+
+        public FlushTask(SelectionKey key) {
+            super(FLUSH);
+            this.key = key;
+        }
+
+    }
+}