changeset 521:dd71d49065ad

HttpSychClientWorker bugfixes
author Devel 2
date Fri, 25 Aug 2017 08:46:27 +0200
parents 31fae01b09d6
children b8d7e565adc9
files stress-tester/src/main/java/com/passus/st/Log4jConfigurationFactory.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java stress-tester/src/test/java/com/passus/st/client/http/HttpClientTest.java stress-tester/src/test/java/com/passus/st/client/http/filter/HttpMessageModificationFilterTest.java
diffstat 7 files changed, 273 insertions(+), 192 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/Log4jConfigurationFactory.java	Wed Aug 23 10:11:02 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/Log4jConfigurationFactory.java	Fri Aug 25 08:46:27 2017 +0200
@@ -59,6 +59,7 @@
 
     public static void enableFactory(Level level) {
         System.setProperty("log4j.configurationFactory", "com.passus.st.Log4jConfigurationFactory");
+        System.setProperty("Log4jContextSelector", "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
         Log4jConfigurationFactory.level = level;
     }
 
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java	Wed Aug 23 10:11:02 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java	Fri Aug 25 08:46:27 2017 +0200
@@ -101,57 +101,64 @@
         this.sleepFactor = sleepFactor;
     }
 
-    protected final void changeFlowState(HttpFlowContext context, int state) {
-        int oldState = context.state;
+    protected final void changeFlowState(HttpFlowContext flowContext, int state) {
+        if (flowContext.state == state) {
+            return;
+        }
+
+        int oldState = flowContext.state;
         if (logger.isDebugEnabled()) {
-            debug(context, "Flow status changing {} -> {}.",
-                    context.stateString(), HttpFlowContext.contextStateToString(state)
+            debug(flowContext, "Flow status changing {} -> {}.",
+                    flowContext.stateString(), HttpFlowContext.contextStateToString(state)
             );
         }
 
         switch (state) {
             case HttpFlowContext.STATE_CONNECTING:
-                context.clear();
+                flowContext.clear();
                 break;
             case HttpFlowContext.STATE_CONNECTED:
-                context.decoder = new HttpFullMessageDecoder();
-                context.decoder.setDecodeRequest(false);
-                context.buffer = new HeapByteBuff(HttpFlowContext.INIT_BUFFER_CAPACITY);
+                flowContext.decoder = new HttpFullMessageDecoder();
+                flowContext.decoder.setDecodeRequest(false);
+                flowContext.buffer = new HeapByteBuff(HttpFlowContext.INIT_BUFFER_CAPACITY);
                 break;
             case HttpFlowContext.STATE_ERROR:
-                context.sentEvent = null;
+                flowContext.sentEvent = null;
                 break;
             case HttpFlowContext.STATE_RESP_RECEIVED:
-                context.sentEvent = null;
-                context.receivedStartTimestamp = -1;
+                flowContext.sentEvent = null;
+                flowContext.receivedStartTimestamp = -1;
                 break;
             case HttpFlowContext.STATE_DISCONNECTING:
-                if (context.state < HttpFlowContext.STATE_DISCONNECTING) {
-                    context.clear();
-                    if (context.channelContext != null) {
+                if (flowContext.state < HttpFlowContext.STATE_DISCONNECTING) {
+                    if (flowContext.channelContext != null) {
                         try {
-                            context.channelContext.close();
+                            flowContext.channelContext.close();
                         } catch (Exception e) {
                             if (logger.isDebugEnabled()) {
                                 logger.debug(e.getMessage(), e);
                             }
                         }
+                    } else {
+                        changeFlowState(flowContext, HttpFlowContext.STATE_DISCONNECTED);
                     }
                 } else {
                     return;
                 }
                 break;
             case HttpFlowContext.STATE_DISCONNECTED:
-                context.state = HttpFlowContext.STATE_DISCONNECTED;
-                context.timeout = -1;
-                context.clear();
+                flowContext.state = HttpFlowContext.STATE_DISCONNECTED;
+                flowContext.timeout = -1;
+                flowContext.clear();
+                removeFlowContext(flowContext);
+                flowStateChanged(flowContext, oldState);
                 return;
         }
 
-        long timeout = timeouts.get(context.state);
-        context.timeout = System.currentTimeMillis() + timeout;
-        context.state = state;
-        flowStateChanged(context, oldState);
+        long timeout = timeouts.get(flowContext.state);
+        flowContext.timeout = System.currentTimeMillis() + timeout;
+        flowContext.state = state;
+        flowStateChanged(flowContext, oldState);
     }
 
     protected void flowStateChanged(HttpFlowContext context, int oldState) {
@@ -186,14 +193,16 @@
     }
 
     protected HttpFlowContext register(SessionInfo session) {
-        if (sessions.containsKey(session)) {
-            logger.warn("Unable to register session '" + session + "'. Session already registered.");
-            return null;
+        synchronized (lock) {
+            if (sessions.containsKey(session)) {
+                logger.warn("Unable to register session '" + session + "'. Session already registered.");
+                return null;
+            }
+
+            HttpFlowContext flowContext = createFlowContext(session);
+            sessions.put(session, flowContext);
+            return flowContext;
         }
-
-        HttpFlowContext flowContext = createFlowContext(session);
-        sessions.put(session, flowContext);
-        return flowContext;
     }
 
     protected HttpFlowContext connect(SessionEvent sessionEvent) {
@@ -201,14 +210,18 @@
     }
 
     protected HttpFlowContext connect(SessionInfo session) {
-        try {
-            HttpFlowContext flowContext = register(session);
-            emitter.connect(session, this, index);
-            return flowContext;
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+        synchronized (lock) {
+            try {
+                HttpFlowContext flowContext = register(session);
+                if (flowContext != null) {
+                    emitter.connect(session, this, index);
+                    return flowContext;
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+            return null;
         }
-        return null;
     }
 
     @Override
@@ -217,7 +230,7 @@
             for (Map.Entry<SessionInfo, HttpFlowContext> entry : sessions.entrySet()) {
                 HttpFlowContext flowContext = entry.getValue();
                 try {
-                    closeSession(flowContext, false);
+                    closeSession(flowContext);
                 } catch (Exception e) {
                     if (logger.isDebugEnabled()) {
                         debug(flowContext, e.getMessage(), e);
@@ -237,7 +250,7 @@
     protected void close(HttpFlowContext flowContext) {
         synchronized (lock) {
             try {
-                closeSession(flowContext, true);
+                closeSession(flowContext);
             } catch (Exception e) {
                 if (logger.isDebugEnabled()) {
                     logger.debug(e.getMessage(), e);
@@ -251,7 +264,7 @@
         synchronized (lock) {
             try {
                 HttpFlowContext flowContext = flowContext(session);
-                closeSession(flowContext, true);
+                closeSession(flowContext);
             } catch (Exception e) {
                 if (logger.isDebugEnabled()) {
                     logger.debug(e.getMessage(), e);
@@ -260,38 +273,41 @@
         }
     }
 
-    protected void closeSession(HttpFlowContext flowContext, boolean remove) {
-        if (flowContext != null) {
-            changeFlowState(flowContext, HttpFlowContext.STATE_DISCONNECTING);
-            if (remove) {
-                removeFlowContext(flowContext);
+    protected void closeSession(HttpFlowContext flowContext) {
+        synchronized (lock) {
+            if (flowContext != null) {
+                changeFlowState(flowContext, HttpFlowContext.STATE_DISCONNECTING);
             }
         }
     }
 
     protected void removeFlowContext(HttpFlowContext flowContext) {
-        sessions.remove(flowContext.sessionInfo());
+        synchronized (lock) {
+            debug(flowContext, "removeFlowContext");
+            sessions.remove(flowContext.sessionInfo());
+        }
     }
 
     protected void reconnect(HttpFlowContext flowContext) {
-        try {
-            if (logger.isDebugEnabled()) {
-                debug(flowContext, "Reconnect (state: {}).", flowContext.stateString());
-            }
+        synchronized (lock) {
+            try {
+                if (logger.isDebugEnabled()) {
+                    debug(flowContext, "Reconnect (state: {}).", flowContext.stateString());
+                }
 
-            SessionInfo session = flowContext.sessionInfo();
-            changeFlowState(flowContext, HttpFlowContext.STATE_CONNECTING);
-            emitter.connect(session, this, index);
-        } catch (Exception e) {
-            error(flowContext, e.getMessage(), e);
+                SessionInfo session = flowContext.sessionInfo();
+                changeFlowState(flowContext, HttpFlowContext.STATE_CONNECTING);
+                emitter.connect(session, this, index);
+            } catch (Exception e) {
+                error(flowContext, e.getMessage(), e);
+            }
         }
-
     }
 
     protected void closeAllConnections() {
         synchronized (lock) {
             for (HttpFlowContext flowContext : sessions.values()) {
-                closeSession(flowContext, true);
+                closeSession(flowContext);
             }
         }
     }
@@ -343,7 +359,6 @@
             }
 
             lock.notifyAll();
-
         }
     }
 
@@ -519,25 +534,26 @@
                     long now = System.currentTimeMillis();
                     req.setTag(TAG_TIME_START, now);
                     context.sendStartTimestamp = now;
+                    changeFlowState(context, HttpFlowContext.STATE_REQ_SENT);
+                    context.sentEvent = event;
+                    
                     context.channelContext.writeAndFlush(context.buffer);
                     if (logger.isDebugEnabled()) {
-                        debug(context, "Request '{}' sent ({} bytes).", req.getUrl(), context.buffer.length());
+                        debug(context, "Request '{}' sending ({} bytes).", req.getUrl(), context.buffer.length());
                     }
+                    
                     context.buffer.clear();
-
-                    context.sentEvent = event;
-                    changeFlowState(context, HttpFlowContext.STATE_REQ_SENT);
+                    
+                    return true;
                 } catch (Exception e) {
                     if (logger.isDebugEnabled()) {
                         debug(context, e.getMessage(), e);
                     }
-
-                    return false;
                 }
             }
         }
 
-        return true;
+        return false;
     }
 
     protected void processTimeouts() {
@@ -560,7 +576,7 @@
                                 case HttpFlowContext.STATE_CONNECTED:
                                 case HttpFlowContext.STATE_REQ_SENT:
                                 case HttpFlowContext.STATE_ERROR:
-                                    closeSession(flowContext, true);
+                                    closeSession(flowContext);
                                     break;
                                 case HttpFlowContext.STATE_RESP_RECEIVED:
                                     //Dziwny blad nie powinien wystepowac
@@ -617,15 +633,19 @@
             message = String.format(message, args);
         }
 
+        SessionInfo session = flowContext.sessionInfo();
+        int localPort = session.getSrcPort();
         if (args.length == 0) {
-            logger.log(level, message + " [{}]", flowContext.sessionInfo());
+            logger.log(level, message + " [{},{}]", session, localPort);
         } else {
-            Object[] logArgs = new Object[args.length + 1];
+            Object[] logArgs = new Object[args.length + 2];
             for (int i = 0; i < args.length; i++) {
                 logArgs[i] = args[i];
             }
-            logArgs[args.length] = flowContext.sessionInfo();
-            logger.log(level, message + " [{}]", logArgs);
+
+            logArgs[logArgs.length - 2] = session;
+            logArgs[logArgs.length - 1] = localPort;
+            logger.log(level, message + " [{},{}]", logArgs);
         }
     }
 }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java	Wed Aug 23 10:11:02 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java	Fri Aug 25 08:46:27 2017 +0200
@@ -1,12 +1,12 @@
 package com.passus.st.client.http;
 
+import com.passus.commons.Assert;
 import com.passus.commons.annotations.Plugin;
 import com.passus.st.client.DataEvents.DataEnd;
 import com.passus.st.client.DataEvents.DataLoopEnd;
 import com.passus.st.client.Event;
 import com.passus.st.client.SessionEvent;
 import com.passus.st.client.SessionStatusEvent;
-import com.passus.st.emitter.ChannelContext;
 import com.passus.st.emitter.Emitter;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.plugin.PluginConstants;
@@ -25,6 +25,9 @@
 
     private long eventsQueueWaitTime = 100;
 
+    /**
+     * Context dla ktorego wykonywana jest operacja.
+     */
     private HttpFlowContext currFlowContext;
 
     private boolean loopEnd = false;
@@ -38,12 +41,23 @@
         return working;
     }
 
+    public long getEventsQueueWaitTime() {
+        return eventsQueueWaitTime;
+    }
+
+    public void setEventsQueueWaitTime(long eventsQueueWaitTime) {
+        Assert.greaterThanZero(eventsQueueWaitTime, "eventsQueueWaitTime");
+        this.eventsQueueWaitTime = eventsQueueWaitTime;
+    }
+
     @Override
-    protected void closeSession(HttpFlowContext flowContext, boolean remove) {
-        if (flowContext != null) {
-            super.closeSession(flowContext, remove);
-
-            if (currFlowContext == flowContext) {
+    protected void flowStateChanged(HttpFlowContext context, int oldState) {
+        logger.debug("flowStateChanged {},{}", context == currFlowContext, context.state());
+        if (context == currFlowContext) {
+            if (context.state() == HttpFlowContext.STATE_CONNECTED
+                    || context.state() == HttpFlowContext.STATE_RESP_RECEIVED
+                    || context.state() == HttpFlowContext.STATE_ERROR
+                    || context.state() == HttpFlowContext.STATE_DISCONNECTED) {
                 currFlowContext = null;
             }
         }
@@ -64,14 +78,6 @@
     }
 
     @Override
-    public void errorOccured(ChannelContext context, Throwable cause) throws Exception {
-        synchronized (lock) {
-            currFlowContext = null;
-            super.errorOccured(context, cause);
-        }
-    }
-
-    @Override
     protected void closeAllConnections() {
         synchronized (lock) {
             boolean wait;
@@ -93,40 +99,43 @@
             } while (wait);
 
             super.closeAllConnections();
-            do {
+            while (!sessions.isEmpty()) {
                 try {
                     lock.wait(100);
                 } catch (Exception e) {
                 }
-            } while (!sessions.isEmpty());
+            }
         }
     }
 
     private boolean pollNext() {
-        Event event = eventsQueue.peek();
+        if (currFlowContext != null) {
+            return false;
+        }
+
+        Event event = eventsQueue.poll();
         if (event != null) {
             sleep(event);
+            if (logger.isTraceEnabled()) {
+                logger.trace("Event processing: {}", event);
+            }
+
             if (event instanceof SessionEvent) {
                 if (event.getType() == SessionStatusEvent.HTTP_SESSION_STATUS) {
-                    eventsQueue.poll();
                     SessionStatusEvent statusEvent = (SessionStatusEvent) event;
                     if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
                         try {
-                            SessionInfo session = statusEvent.getSessionInfo();
-                            currFlowContext = register(session);
-                            emitter.connect(session, this, index);
+                            currFlowContext = connect(statusEvent);
                         } catch (Exception e) {
                             logger.error(e.getMessage(), e);
                         }
 
-                        return false;
+                        return (currFlowContext == null);
                     } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
                         currFlowContext = flowContext((SessionEvent) event);
                         if (currFlowContext != null) {
                             if (currFlowContext.state != HttpFlowContext.STATE_REQ_SENT) {
-                                SessionInfo session = statusEvent.getSessionInfo();
-                                close(session);
-                                currFlowContext = null;
+                                close(statusEvent);
                             }
                         }
                     }
@@ -134,22 +143,25 @@
                     return true;
                 } else if (event.getType() == HttpSessionPayloadEvent.HTTP_SESSION_PAYLOAD) {
                     SessionEvent sessEvent = (SessionEvent) event;
-                    currFlowContext = flowContext(sessEvent);
-                    if (currFlowContext != null) {
-                        switch (currFlowContext.state) {
+                    HttpFlowContext flowContext = flowContext(sessEvent);
+                    if (flowContext != null) {
+                        switch (flowContext.state) {
                             case HttpFlowContext.STATE_CONNECTED:
                             case HttpFlowContext.STATE_RESP_RECEIVED:
                             case HttpFlowContext.STATE_ERROR:
-                                eventsQueue.poll();
-                                send(currFlowContext, (HttpSessionPayloadEvent) event);
-                                return false;
+                                currFlowContext = flowContext;
+                                if (send(flowContext, (HttpSessionPayloadEvent) event)) {
+                                    return false;
+                                } else {
+                                    currFlowContext = null;
+                                    return true;
+                                }
                             case HttpFlowContext.STATE_DISCONNECTING:
                             case HttpFlowContext.STATE_DISCONNECTED:
                                 if (connectPartialSession) {
                                     currFlowContext = connect(sessEvent);
                                     return false;
                                 } else {
-                                    eventsQueue.poll();
                                     return true;
                                 }
                             default:
@@ -157,37 +169,35 @@
                         }
                     } else if (connectPartialSession) {
                         currFlowContext = connect(sessEvent);
-                        return false;
-                    } else {
-                        eventsQueue.poll();
-                        return true;
+                        if (currFlowContext != null) {
+                            eventsQueue.addFirst(sessEvent);
+                            return false;
+                        } else {
+                            return true;
+                        }
                     }
+
+                    return true;
                 } else {
-                    eventsQueue.poll();
                     return true;
                 }
-            } else {
-                eventsQueue.poll();
-                if (event.getType() == DataLoopEnd.TYPE) {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("DataLoopEnd received.");
-                    }
+            } else if (event.getType() == DataLoopEnd.TYPE) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("DataLoopEnd received.");
+                }
 
-                    loopEnd = true;
-                    closeAllConnections();
-                    filterChain.reset();
-                    loopEnd = false;
-                    return true;
-                } else if (event.getType() == DataEnd.TYPE) {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("DataEnd received. Deactivation.");
-                    }
+                loopEnd = true;
+                closeAllConnections();
+                filterChain.reset();
+                loopEnd = false;
+                return true;
+            } else if (event.getType() == DataEnd.TYPE) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("DataEnd received. Deactivation.");
+                }
 
-                    working = false;
-                }
+                working = false;
             }
-        } else {
-            currFlowContext = null;
         }
 
         return false;
@@ -204,19 +214,14 @@
                     } catch (InterruptedException ignore) {
                     }
 
-                    boolean pollNext = false;
+                    boolean nextPoll = true;
                     do {
                         if (loopEnd) {
                             break;
                         }
 
-                        if (currFlowContext == null) {
-                            pollNext = pollNext();
-                        } else if (currFlowContext.state != HttpFlowContext.STATE_CONNECTING
-                                && currFlowContext.state != HttpFlowContext.STATE_REQ_SENT) {
-                            pollNext = pollNext();
-                        }
-                    } while (pollNext);
+                        nextPoll = pollNext();
+                    } while (nextPoll);
                 } catch (Exception e) {
                     if (logger.isDebugEnabled()) {
                         logger.debug(e.getMessage(), e);
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java	Wed Aug 23 10:11:02 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java	Fri Aug 25 08:46:27 2017 +0200
@@ -6,11 +6,12 @@
 import com.passus.st.emitter.ChannelContext;
 import com.passus.st.emitter.SessionInfo;
 import java.io.IOException;
-import java.nio.Buffer;
 import java.nio.ByteBuffer;
-import java.nio.channels.Pipe;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  *
@@ -24,7 +25,7 @@
 
     private final SocketChannel channel;
 
-    private final Pipe pipe;
+    private final Queue<ByteBuffer> dataQueue;
 
     private SocketAddress localAddress;
 
@@ -32,24 +33,28 @@
 
     private SelectionKey key;
 
-    public NioChannelContext(NioEmitterWorker worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo, Pipe pipe) {
+    public static AtomicInteger writes = new AtomicInteger();
+    
+    public NioChannelContext(NioEmitterWorker worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) {
         this.worker = worker;
         this.channel = channel;
         this.remoteAddress = remoteAddress;
         this.sessionInfo = sessionInfo;
-        this.pipe = pipe;
+        this.dataQueue = new LinkedList<>();
+
     }
 
-    Pipe pipe() {
-        return pipe;
+    Queue<ByteBuffer> dataQueue() {
+        return dataQueue;
     }
 
     void selectionKey(SelectionKey key) {
         this.key = key;
     }
 
-    private void writeToPipe(ByteBuffer buffer) throws IOException {
-        pipe.sink().write(buffer);
+    private void addToQeueu(ByteBuffer buffer) throws IOException {
+        dataQueue.add(buffer);
+        writes.incrementAndGet();
     }
 
     @Override
@@ -64,17 +69,17 @@
 
     @Override
     public void write(byte[] data, int offset, int length) throws IOException {
-        writeToPipe(ByteBuffer.wrap(data, offset, length));
+        addToQeueu(ByteBuffer.wrap(data, offset, length));
     }
 
     @Override
     public void write(ByteBuff data) throws IOException {
-        writeToPipe(data.toNioByteBuffer());
+        addToQeueu(data.toNioByteBuffer());
     }
 
     @Override
     public void flush() {
-        NioEmitterWorker.setOpWrite(key);
+        worker.flush(key);
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Wed Aug 23 10:11:02 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Fri Aug 25 08:46:27 2017 +0200
@@ -16,7 +16,6 @@
 import java.io.IOException;
 import java.net.ConnectException;
 import java.nio.ByteBuffer;
-import java.nio.channels.Pipe;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
@@ -141,7 +140,12 @@
     }
 
     public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException {
-        tasks.add(new TaskConnect(sessionInfo, handler));
+        tasks.add(new ConnectTask(sessionInfo, handler));
+        selector.wakeup();
+    }
+
+    void flush(SelectionKey key) {
+        tasks.add(new FlushTask(key));
         selector.wakeup();
     }
 
@@ -174,13 +178,12 @@
                 channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress));
             }
 
-            Pipe pipe = Pipe.open();
             SocketAddress remoteAddress = connParams.getRemoteAddress();
             if (remoteAddress == null) {
                 remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
             }
 
-            NioChannelContext channelContext = new NioChannelContext(this, channel, remoteAddress, sessionInfo, pipe);
+            NioChannelContext channelContext = new NioChannelContext(this, channel, remoteAddress, sessionInfo);
             KeyContext keyContext = new KeyContext(channelContext, handler);
             SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext);
             try {
@@ -229,7 +232,7 @@
 
             if (timeouted) {
                 if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("Connection to '" + keyContext.channelContext.getRemoteAddress() + "' timed out.");
+                    LOGGER.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress());
                 }
 
                 throw new ConnectException("Connection timed out.");
@@ -251,12 +254,12 @@
                 metric.addBindSocket(keyContext.channelContext.getLocalAddress());
             }
 
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress());
+            }
+
             keyContext.handler.channelActive(keyContext.channelContext);
             setOpRead(key);
-
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Connected to '" + keyContext.channelContext.getRemoteAddress() + "'.");
-            }
         } catch (Exception ex) {
             LOGGER.error(ex.getMessage(), ex);
         }
@@ -265,27 +268,19 @@
     private void doWrite(SelectionKey key) {
         SocketChannel socketChannel = (SocketChannel) key.channel();
         KeyContext keyContext = (KeyContext) key.attachment();
-        ByteBuffer buffer = keyContext.buffer;
-
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Writing to '" + keyContext.channelContext.getRemoteAddress() + "'.");
+            LOGGER.debug("Writing to '{}'.", keyContext.channelContext.getRemoteAddress());
         }
 
-        keyContext.buffer.clear();
-
-        Pipe.SourceChannel source = keyContext.channelContext.pipe().source();
-        try {
-            source.configureBlocking(false);
-        } catch (Exception e) {
-        }
-
-        int readed;
+        Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue();
         int written = 0;
         try {
-            while ((readed = source.read(buffer)) > 0) {
-                buffer.flip();
+            ByteBuffer buffer;
+            while (!queue.isEmpty()) {
+                buffer = queue.poll();
                 while (buffer.hasRemaining()) {
                     int res = socketChannel.write(buffer);
+
                     if (res == -1) {
                         doClose(key);
                         return;
@@ -297,8 +292,6 @@
 
                     written += res;
                 }
-
-                buffer.clear();
             }
         } catch (Exception e) {
             doCatchException(key, e);
@@ -312,6 +305,7 @@
 
         //TODO Operacje na handlerach powinny przechodzic przez Executor
         try {
+            NioChannelContext.writes.decrementAndGet();
             keyContext.handler.dataWritten(keyContext.channelContext);
         } catch (Exception e) {
             LOGGER.debug(e.getMessage(), e);
@@ -467,7 +461,7 @@
     }
 
     void requestClose(SelectionKey key) {
-        tasks.add(new TaskClose(key));
+        tasks.add(new CloseTask(key));
         key.selector().wakeup();
     }
 
@@ -476,24 +470,27 @@
         int selected = 0;
         working = true;
         while (working) {
+            if (!tasks.isEmpty()) {
+                Task task;
+                while ((task = tasks.poll()) != null) {
+                    if (task.code == Task.CLOSE) {
+                        doClose(((CloseTask) task).key);
+                    } else if (task.code == Task.CONNECT) {
+                        ConnectTask taskConn = (ConnectTask) task;
+                        doConnect(taskConn.sessionInfo, taskConn.handler);
+                    } else if (task.code == Task.FLUSH) {
+                        FlushTask flushTask = (FlushTask) task;
+                        setOpWrite(flushTask.key);
+                    }
+                }
+            }
+
             try {
                 selected = selector.select(selectTimeout);
             } catch (IOException ex) {
                 LOGGER.warn(ex.getMessage(), ex);
             }
 
-            if (!tasks.isEmpty()) {
-                Task task;
-                while ((task = tasks.poll()) != null) {
-                    if (task.code == Task.CLOSE) {
-                        doClose(((TaskClose) task).key);
-                    } else if (task.code == Task.CONNECT) {
-                        TaskConnect taskConn = (TaskConnect) task;
-                        doConnect(taskConn.sessionInfo, taskConn.handler);
-                    }
-                }
-            }
-
             if (selected > 0) {
                 Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                 while (it.hasNext()) {
@@ -535,6 +532,7 @@
 
         public static final int CLOSE = 1;
         public static final int CONNECT = 2;
+        public static final int FLUSH = 3;
 
         private final int code;
 
@@ -544,12 +542,12 @@
 
     }
 
-    private static class TaskConnect extends Task {
+    private final static class ConnectTask extends Task {
 
         private final SessionInfo sessionInfo;
         private final EmitterHandler handler;
 
-        public TaskConnect(SessionInfo sessionInfo, EmitterHandler handler) {
+        public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) {
             super(CONNECT);
             this.sessionInfo = sessionInfo;
             this.handler = handler;
@@ -557,14 +555,25 @@
 
     }
 
-    private static class TaskClose extends Task {
+    private final static class CloseTask extends Task {
 
         private final SelectionKey key;
 
-        public TaskClose(SelectionKey key) {
+        public CloseTask(SelectionKey key) {
             super(CLOSE);
             this.key = key;
         }
 
     }
+
+    private final static class FlushTask extends Task {
+
+        private final SelectionKey key;
+
+        public FlushTask(SelectionKey key) {
+            super(FLUSH);
+            this.key = key;
+        }
+
+    }
 }
--- a/stress-tester/src/test/java/com/passus/st/client/http/HttpClientTest.java	Wed Aug 23 10:11:02 2017 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/http/HttpClientTest.java	Fri Aug 25 08:46:27 2017 +0200
@@ -10,7 +10,6 @@
 import com.passus.st.client.SessionEvent;
 import com.passus.st.client.SessionStatusEvent;
 import com.passus.st.client.TestHttpClientListener;
-import com.passus.st.client.TestHttpClientListener.HttpClientEventType;
 import com.passus.st.client.TestHttpClientListener.ResponseReceivedEvent;
 import com.passus.st.utils.EventUtils;
 import java.util.List;
@@ -19,6 +18,7 @@
 import org.testng.annotations.Test;
 import static com.github.tomakehurst.wiremock.client.WireMock.post;
 import com.passus.commons.service.ServiceUtils;
+import java.util.LinkedList;
 import org.testng.annotations.BeforeMethod;
 
 /**
@@ -56,7 +56,6 @@
 
         NioEmitter emitter = prepareEmitter("10.87.110.40:4214 ->  " + HOST + ":" + PORT);
         emitter.start();
-
         TestHttpClientListener listner = new TestHttpClientListener();
 
         HttpClient client = new HttpClient(emitter);
@@ -83,7 +82,7 @@
         }
     }
 
-    @Test
+    @Test(enabled = true)
     public void testHandle_ConnectPartialSession() throws Exception {
         Properties props = new Properties();
         props.put("allowPartialSession", "true");
@@ -107,7 +106,6 @@
             client.join();
 
             assertTrue(listner.size() > 0);
-            assertTrue(listner.size() > 0);
             assertTrue(listner.get(0) instanceof ResponseReceivedEvent);
         } finally {
             ServiceUtils.stopQuietly(client);
@@ -115,4 +113,47 @@
         }
     }
 
+    @Test(enabled = true)
+    public void testHandle_ThreeLoops() throws Exception {
+        Properties props = new Properties();
+        props.put("allowPartialSession", "true");
+        props.put("ports", "4214");
+        LinkedList<Event> events = new LinkedList<>(EventUtils.readEvents("pcap/http/http_req_resp.pcap", props));
+        assertEquals(4, events.size());
+        Event dataEnd = events.removeLast(); //Usuwamy DataEnd
+
+        SessionEvent sessionEvent = (SessionEvent) events.get(0);
+        events.addFirst(new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED));
+
+        NioEmitter emitter = prepareEmitter("10.87.110.40:4214 ->  " + HOST + ":" + PORT);
+        emitter.start();
+        TestHttpClientListener listner = new TestHttpClientListener();
+
+        HttpClient client = new HttpClient(emitter);
+        try {
+            client.addListener(listner);
+            client.start();
+
+            for (int i = 0; i < 3; i++) {
+                events.forEach((event) -> {
+                    client.handle(event);
+                });
+            }
+            client.handle(dataEnd);
+            client.join();
+
+            assertEquals(3, listner.size());
+
+            for (int i = 0; i < 3; i++) {
+                assertTrue(listner.get(i) instanceof ResponseReceivedEvent);
+                ResponseReceivedEvent event = (ResponseReceivedEvent) listner.get(i);
+                String responseStr = event.getResponse().toString();
+                assertTrue(responseStr.startsWith("HTTP/1.1 200 OK"));
+                assertTrue(responseStr.endsWith("test"));
+            }
+        } finally {
+            ServiceUtils.stopQuietly(client);
+            ServiceUtils.stopQuietly(emitter);
+        }
+    }
 }
--- a/stress-tester/src/test/java/com/passus/st/client/http/filter/HttpMessageModificationFilterTest.java	Wed Aug 23 10:11:02 2017 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/http/filter/HttpMessageModificationFilterTest.java	Fri Aug 25 08:46:27 2017 +0200
@@ -139,7 +139,7 @@
 
     }
 
-    @Test
+    @Test(enabled = true)
     public void testComplexExpression() throws Exception {
         String filterConfig = "filters:\n"
                 + "    - type: modifyMessage\n"