changeset 958:c15144d4da64

Refactorization in progress
author Devel 2
date Thu, 30 May 2019 10:26:34 +0200
parents 8fc6d1cae116
children 773f0f4ff33b
files stress-tester/src/main/java/com/passus/st/ConverterHttpClient.java stress-tester/src/main/java/com/passus/st/PcapReporter.java stress-tester/src/main/java/com/passus/st/ReaderMain.java stress-tester/src/main/java/com/passus/st/StatsMain.java stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/ClientXFactoryImpl.java stress-tester/src/main/java/com/passus/st/client/FilterAware.java stress-tester/src/main/java/com/passus/st/client/FlowWorker.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpNullClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java stress-tester/src/main/java/com/passus/st/client/http/HttpStresserWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java stress-tester/src/test/java/com/passus/st/client/AsynchFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/client/http/HttpAsynchClientWorkerTest.java stress-tester/src/test/java/com/passus/st/client/http/HttpParallelClientWorkerTest.java stress-tester/src/test/java/com/passus/st/client/http/HttpSynchClientWorkerTest.java stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java
diffstat 28 files changed, 1105 insertions(+), 2611 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/ConverterHttpClient.java	Wed May 29 15:03:59 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/ConverterHttpClient.java	Thu May 30 10:26:34 2019 +0200
@@ -4,7 +4,6 @@
 import com.passus.commons.service.Service;
 import com.passus.st.client.*;
 import com.passus.st.client.http.HttpScopes;
-import com.passus.st.client.http.HttpSessionPayloadEvent;
 import com.passus.st.client.http.filter.HttpFilter;
 import com.passus.st.client.http.filter.HttpFlowUtils;
 import com.passus.st.emitter.SessionInfo;
--- a/stress-tester/src/main/java/com/passus/st/PcapReporter.java	Wed May 29 15:03:59 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/PcapReporter.java	Thu May 30 10:26:34 2019 +0200
@@ -6,7 +6,8 @@
 import com.passus.net.http.*;
 import com.passus.net.http.session.HttpSessionAnalyzer;
 import com.passus.st.client.*;
-import com.passus.st.client.http.*;
+import com.passus.st.client.http.HttpScopes;
+import com.passus.st.client.http.ReporterRemoteDestination;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.metric.FileMetricsCollectionAppender;
 import com.passus.st.metric.ScheduledMetricsCollector;
@@ -27,6 +28,7 @@
 import java.util.stream.Collectors;
 
 import static com.passus.st.Main.*;
+import static com.passus.st.Protocols.HTTP;
 import static com.passus.st.client.http.HttpConsts.*;
 import static com.passus.st.client.http.filter.HttpFlowUtils.createFlowContext;
 import static com.passus.st.client.http.filter.HttpFlowUtils.extractHttpContext;
@@ -243,8 +245,8 @@
                         }
                     }
 
-                } else if (event.getType() == HttpSessionPayloadEvent.TYPE) {
-                    HttpSessionPayloadEvent sessEvent = (HttpSessionPayloadEvent) event;
+                } else if (event.getType() == SessionPayloadEvent.TYPE) {
+                    SessionPayloadEvent sessEvent = (SessionPayloadEvent) event;
                     currFlowContext = flowContext(sessEvent);
                     if (currFlowContext == null && partialSession) {
                         try {
@@ -264,24 +266,26 @@
             }
         }
 
-        private void send(HttpSessionPayloadEvent event) {
+        private void send(SessionPayloadEvent event) {
             synchronized (lock) {
-                HttpRequest req = event.getRequest();
-                HttpResponse resp = event.getResponse();
-                System.out.println(req == null ? null : req.getUrl());
-                ++count;
-                if (req != null && resp != null) {
-                    try {
-                        setSizeAndTimeTags(req, reqEncoder);
-                        setSizeAndTimeTags(resp, respEncoder);
-                        FlowContext ctx = sessions.get(event.getSessionInfo());
-                        if (ctx != null) {
-                            extractHttpContext(ctx).scopes().removeConversation(req);
+                if (event.getProtocolId() == HTTP) {
+                    HttpRequest req = (HttpRequest) event.getRequest();
+                    HttpResponse resp = (HttpResponse) event.getResponse();
+                    System.out.println(req == null ? null : req.getUrl());
+                    ++count;
+                    if (req != null && resp != null) {
+                        try {
+                            setSizeAndTimeTags(req, reqEncoder);
+                            setSizeAndTimeTags(resp, respEncoder);
+                            FlowContext ctx = sessions.get(event.getSessionInfo());
+                            if (ctx != null) {
+                                extractHttpContext(ctx).scopes().removeConversation(req);
+                            }
+
+                            reporter.responseReceived(req, resp, ctx);
+                        } catch (Exception e) {
+                            logger.debug(e.getMessage(), e);
                         }
-
-                        reporter.responseReceived(req, resp, ctx);
-                    } catch (Exception e) {
-                        logger.debug(e.getMessage(), e);
                     }
                 }
             }
--- a/stress-tester/src/main/java/com/passus/st/ReaderMain.java	Wed May 29 15:03:59 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/ReaderMain.java	Thu May 30 10:26:34 2019 +0200
@@ -2,25 +2,16 @@
 
 import com.passus.data.DataSourceUtils;
 import com.passus.data.PooledByteBuffAllocator;
-import com.passus.net.http.HttpMessageHelper;
-import com.passus.net.http.HttpRequest;
-import com.passus.net.http.HttpRequestEncoder;
-import com.passus.net.http.HttpResponse;
-import com.passus.net.http.HttpResponseEncoder;
+import com.passus.net.http.*;
 import com.passus.st.client.Event;
+import com.passus.st.client.SessionPayloadEvent;
 import com.passus.st.client.SessionStatusEvent;
-import com.passus.st.client.http.HttpSessionPayloadEvent;
 import com.passus.st.client.http.filter.HttpMessagePredicate;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.filter.Transformers;
 import com.passus.st.source.EventSource;
 import com.passus.st.source.NcEventSource;
 import com.passus.st.source.PcapSessionEventSource;
-import static com.passus.st.utils.CliUtils.option;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.function.Predicate;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -28,8 +19,15 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.function.Predicate;
+
+import static com.passus.st.Protocols.HTTP;
+import static com.passus.st.utils.CliUtils.option;
+
 /**
- *
  * @author Mirosław Hawrot
  */
 public class ReaderMain {
@@ -113,60 +111,62 @@
             printSessionInfo(statusEvent.getSessionInfo(), ps);
             ps.print(" status: ");
             ps.println(SessionStatusEvent.statusToString(statusEvent.getStatus()));
-        } else if (event.getType() == HttpSessionPayloadEvent.TYPE) {
-            HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event;
-            HttpRequest req = payloadEvent.getRequest();
-            HttpResponse resp = payloadEvent.getResponse();
-            if (filter != null && !filter.test(req, resp, null)) {
-                return;
-            }
-
-            printSessionInfo(payloadEvent.getSessionInfo(), ps);
-            ps.println(" payload: ");
-
-            if (req != null) {
-                try {
-                    HttpRequestEncoder.INSTANCE.encodeHeaders(req, ps);
-                    if (printContent) {
-                        if (!printBinaryContent && messageHelper.hasBinaryContent(req)) {
-                            ps.println("<binary content>");
-                        } else {
-                            if (decodeContent) {
-                                messageHelper.decodeContent(req, false);
-                            }
-
-                            HttpRequestEncoder.INSTANCE.encodeContent(req, ps);
-                        }
-                    }
+        } else if (event.getType() == SessionPayloadEvent.TYPE) {
+            SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event;
+            if (payloadEvent.getProtocolId() == HTTP) {
+                HttpRequest req = (HttpRequest) payloadEvent.getRequest();
+                HttpResponse resp = (HttpResponse) payloadEvent.getResponse();
+                if (filter != null && !filter.test(req, resp, null)) {
+                    return;
+                }
 
-                    ps.println("");
-                } catch (IOException ex) {
-                    logger.debug(ex.getMessage(), ex);
-                } finally {
-                    DataSourceUtils.release(req.getContent());
-                }
-            }
+                printSessionInfo(payloadEvent.getSessionInfo(), ps);
+                ps.println(" payload: ");
 
-            if (resp != null) {
-                try {
-                    HttpResponseEncoder.INSTANCE.encodeHeaders(resp, ps);
-                    if (printContent) {
-                        if (!printBinaryContent && messageHelper.hasBinaryContent(req)) {
-                            ps.println("<binary content>");
-                        } else {
-                            if (decodeContent) {
-                                messageHelper.decodeContent(resp, false);
+                if (req != null) {
+                    try {
+                        HttpRequestEncoder.INSTANCE.encodeHeaders(req, ps);
+                        if (printContent) {
+                            if (!printBinaryContent && messageHelper.hasBinaryContent(req)) {
+                                ps.println("<binary content>");
+                            } else {
+                                if (decodeContent) {
+                                    messageHelper.decodeContent(req, false);
+                                }
+
+                                HttpRequestEncoder.INSTANCE.encodeContent(req, ps);
                             }
-
-                            HttpResponseEncoder.INSTANCE.encodeContent(resp, ps);
                         }
-                    }
 
-                    ps.println("");
-                } catch (IOException ex) {
-                    logger.debug(ex.getMessage(), ex);
-                } finally {
-                    DataSourceUtils.release(resp.getContent());
+                        ps.println("");
+                    } catch (IOException ex) {
+                        logger.debug(ex.getMessage(), ex);
+                    } finally {
+                        DataSourceUtils.release(req.getContent());
+                    }
+                }
+
+                if (resp != null) {
+                    try {
+                        HttpResponseEncoder.INSTANCE.encodeHeaders(resp, ps);
+                        if (printContent) {
+                            if (!printBinaryContent && messageHelper.hasBinaryContent(req)) {
+                                ps.println("<binary content>");
+                            } else {
+                                if (decodeContent) {
+                                    messageHelper.decodeContent(resp, false);
+                                }
+
+                                HttpResponseEncoder.INSTANCE.encodeContent(resp, ps);
+                            }
+                        }
+
+                        ps.println("");
+                    } catch (IOException ex) {
+                        logger.debug(ex.getMessage(), ex);
+                    } finally {
+                        DataSourceUtils.release(resp.getContent());
+                    }
                 }
             }
         }
--- a/stress-tester/src/main/java/com/passus/st/StatsMain.java	Wed May 29 15:03:59 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/StatsMain.java	Thu May 30 10:26:34 2019 +0200
@@ -5,20 +5,16 @@
 import com.passus.data.ByteString;
 import com.passus.data.ByteStringImpl;
 import com.passus.data.PooledByteBuffAllocator;
-import static com.passus.net.http.HttpHeaders.CONTENT_ENCODING;
-import static com.passus.net.http.HttpHeaders.TRANSFER_ENCODING;
 import com.passus.net.http.HttpMessage;
 import com.passus.net.http.HttpMessageHelper;
 import com.passus.net.http.HttpRequest;
 import com.passus.net.http.HttpResponse;
 import com.passus.st.client.Event;
+import com.passus.st.client.SessionPayloadEvent;
 import com.passus.st.client.SessionStatusEvent;
-import com.passus.st.client.http.HttpSessionPayloadEvent;
 import com.passus.st.source.EventSource;
 import com.passus.st.source.NcEventSource;
 import com.passus.st.source.PcapSessionEventSource;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -27,8 +23,14 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.passus.net.http.HttpHeaders.CONTENT_ENCODING;
+import static com.passus.net.http.HttpHeaders.TRANSFER_ENCODING;
+import static com.passus.st.Protocols.HTTP;
+
 /**
- *
  * @author Mirosław Hawrot
  */
 public class StatsMain {
@@ -83,30 +85,32 @@
     private void stats(Event event) {
         if (event.getType() == SessionStatusEvent.TYPE) {
 
-        } else if (event.getType() == HttpSessionPayloadEvent.TYPE) {
-            HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event;
-            HttpRequest req = payloadEvent.getRequest();
-            HttpResponse resp = payloadEvent.getResponse();
+        } else if (event.getType() == SessionPayloadEvent.TYPE) {
+            SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event;
+            if (payloadEvent.getProtocolId() == HTTP) {
+                HttpRequest req = (HttpRequest) payloadEvent.getRequest();
+                HttpResponse resp = (HttpResponse) payloadEvent.getResponse();
 
-            if (req != null) {
-                stats.reqNum++;
-                stats.addContentEncoding(req);
-                stats.addTransferEncoding(req);
-                stats.addEncoding(req);
-                if (req.getContent() != null) {
-                    stats.reqContentSize.update(req.getContent().available());
-                    stats.reqContentSizeSum += req.getContent().available();
+                if (req != null) {
+                    stats.reqNum++;
+                    stats.addContentEncoding(req);
+                    stats.addTransferEncoding(req);
+                    stats.addEncoding(req);
+                    if (req.getContent() != null) {
+                        stats.reqContentSize.update(req.getContent().available());
+                        stats.reqContentSizeSum += req.getContent().available();
+                    }
                 }
-            }
 
-            if (resp != null) {
-                stats.respNum++;
-                stats.addContentEncoding(resp);
-                stats.addTransferEncoding(resp);
-                stats.addEncoding(resp);
-                if (resp.getContent() != null) {
-                    stats.respContentSize.update(resp.getContent().available());
-                    stats.respContentSizeSum += resp.getContent().available();
+                if (resp != null) {
+                    stats.respNum++;
+                    stats.addContentEncoding(resp);
+                    stats.addTransferEncoding(resp);
+                    stats.addEncoding(resp);
+                    if (resp.getContent() != null) {
+                        stats.respContentSize.update(resp.getContent().available());
+                        stats.respContentSizeSum += resp.getContent().available();
+                    }
                 }
             }
         }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java	Thu May 30 10:26:34 2019 +0200
@@ -0,0 +1,558 @@
+package com.passus.st.client;
+
+import com.passus.commons.Assert;
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpResponse;
+import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.SessionInfo;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class AsynchFlowWorker extends FlowWorkerBased {
+
+    public static final String TYPE = "asynch";
+
+    private long waitTimeout = 100;
+
+    private long windowPeriod = 10;
+
+    private final Queue<TasksTimeWindow> windows = new ConcurrentLinkedQueue<>();
+
+    private TasksTimeWindow currentWindow;
+
+    private volatile boolean processWindow = false;
+
+    private volatile boolean flowStateChanged = false;
+
+    private final Object readerLock = new Object();
+
+    private boolean responseSynch = false;
+
+    public AsynchFlowWorker(Emitter emitter, String name, int index) {
+        super(emitter, name, index);
+
+    }
+
+    public long getWindowPeriod() {
+        return windowPeriod;
+    }
+
+    public void setWindowPeriod(long windowPeriod) {
+        Assert.greaterThanZero(windowPeriod, "tickTime");
+        this.windowPeriod = windowPeriod;
+    }
+
+    public long getWaitTimeout() {
+        return waitTimeout;
+    }
+
+    public void setWaitTimeout(long waitTimeout) {
+        Assert.greaterThanZero(waitTimeout, "eventsQueueWaitTime");
+        this.waitTimeout = waitTimeout;
+    }
+
+    public boolean isResponseSynch() {
+        return responseSynch;
+    }
+
+    public void setResponseSynch(boolean responseSynch) {
+        this.responseSynch = responseSynch;
+    }
+
+    private void waitQuietly() {
+        try {
+            lock.wait(waitTimeout);
+        } catch (InterruptedException ignore) {
+        }
+    }
+
+    @Override
+    protected void closeAllConnections() {
+        synchronized (lock) {
+            boolean wait;
+            do {
+                wait = false;
+                for (FlowContext flowContext : sessions.values()) {
+                    if (flowContext.state() == FlowContext.STATE_REQ_SENT) {
+                        wait = true;
+                        break;
+                    }
+                }
+
+                if (wait) {
+                    try {
+                        lock.wait(100);
+                    } catch (Exception e) {
+                    }
+                }
+            } while (wait);
+
+            super.closeAllConnections();
+            while (!sessions.isEmpty()) {
+                try {
+                    lock.wait(100);
+                } catch (Exception e) {
+                }
+            }
+        }
+    }
+
+    private void removeWindow(TasksTimeWindow window) {
+        if (window != null) {
+            windows.remove(window);
+        }
+    }
+
+    private TasksTimeWindow createWindow(long time) {
+        TasksTimeWindow window = new TasksTimeWindow(time, windowPeriod);
+        windows.add(window);
+        return window;
+    }
+
+    private TasksTimeWindow getWindow(long time, boolean create) {
+        if (windows.isEmpty()) {
+            if (create) {
+                return createWindow(time);
+            }
+
+            return null;
+        }
+
+        TasksTimeWindow firstWindow = windows.peek();
+        if (firstWindow.inTimeRange(time)) {
+            return firstWindow;
+        } else if (windows.size() > 1) {
+            Iterator<TasksTimeWindow> it = windows.iterator();
+            it.next();
+
+            while (it.hasNext()) {
+                TasksTimeWindow window = it.next();
+                if (window.inTimeRange(time)) {
+                    return window;
+                }
+            }
+        }
+
+        if (create) {
+            return createWindow(time);
+        }
+
+        return null;
+    }
+
+    @Override
+    public void sessionInvalidated(SessionInfo session) throws Exception {
+        synchronized (lock) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Session {} invalidated.", session);
+            }
+
+            FlowContext flowContext = flowContext(session);
+            if (flowContext != null) {
+                changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING);
+            }
+
+            addBlockedSession(session);
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    protected void flowStateChanged(FlowContext context, int oldState) {
+        synchronized (lock) {
+            flowStateChanged = true;
+            lock.notifyAll();
+        }
+    }
+
+    private void addEvent(Event event, TasksTimeWindow window) {
+        switch (event.getType()) {
+            case SessionPayloadEvent.TYPE: {
+                Event newEvent = eventInstanceForWorker(event);
+                long time = newEvent.getTimestamp();
+                SessionPayloadEvent payloadEvent = (SessionPayloadEvent) newEvent;
+                SessionInfo session = payloadEvent.getSessionInfo();
+
+                SessionEventsTask task = window.getSessionEventsTask(session, true);
+                task.events.add(payloadEvent);
+
+                if (responseSynch) {
+                    HttpResponse resp = (HttpResponse) payloadEvent.getResponse();
+                    HttpRequest req = (HttpRequest) payloadEvent.getRequest();
+                    long respTime = time + (resp.getTimestamp() - req.getTimestamp());
+                    HttpResponseEvent respEvent = new HttpResponseEvent(session, event.getSourceName(), resp, respTime);
+
+                    task.events.add(respEvent);
+                }
+
+                break;
+            }
+            case SessionStatusEvent.TYPE: {
+                Event newEvent = eventInstanceForWorker(event);
+                SessionStatusEvent statusEvent = (SessionStatusEvent) newEvent;
+                SessionEventsTask task = window.getSessionEventsTask(statusEvent.getSessionInfo(), true);
+                task.events.add(statusEvent);
+                break;
+            }
+            case DataEvents.DataLoopEnd.TYPE:
+                window.add(DataLoopEndTask.INSTANCE);
+                processWindow = true;
+                break;
+            case DataEvents.DataEnd.TYPE:
+                window.add(DataEndTask.INSTANCE);
+                processWindow = true;
+                break;
+        }
+    }
+
+    @Override
+    public void handle(Event event) {
+        synchronized (readerLock) {
+            while (processWindow) {
+                try {
+                    readerLock.wait();
+                } catch (InterruptedException ignore) {
+
+                }
+            }
+        }
+
+        synchronized (lock) {
+            TasksTimeWindow window = getWindow(event.getTimestamp(), true);
+            if (currentWindow == null) {
+                currentWindow = window;
+            } else if (window != currentWindow) {
+                currentWindow = windows.peek();
+                processWindow = true;
+            }
+
+            addEvent(event, window);
+            if (processWindow) {
+                lock.notifyAll();
+            }
+        }
+
+    }
+
+    private boolean processSessionEvent(SessionEvent event) {
+        switch (event.getType()) {
+            case SessionStatusEvent.TYPE: {
+                SessionStatusEvent statusEvent = (SessionStatusEvent) event;
+                if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
+                    connect(statusEvent);
+                } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
+                    FlowContext flowContext = flowContext(statusEvent);
+                    if (flowContext != null) {
+                        if (flowContext.state() != FlowContext.STATE_REQ_SENT) {
+                            close(statusEvent);
+                        }
+                    }
+                }
+
+                return true;
+            }
+            case SessionPayloadEvent.TYPE: {
+                SessionEvent sessEvent = event;
+                FlowContext flowContext = flowContext(sessEvent);
+                if (flowContext != null) {
+                    switch (flowContext.state()) {
+                        case FlowContext.STATE_CONNECTING:
+                        case FlowContext.STATE_REQ_SENT:
+                            return false;
+                        case FlowContext.STATE_CONNECTED:
+                        case FlowContext.STATE_RESP_RECEIVED:
+                        case FlowContext.STATE_ERROR:
+                            if (send(flowContext, (SessionPayloadEvent) event)) {
+                                return true;
+                            }
+                        case FlowContext.STATE_DISCONNECTING:
+                        case FlowContext.STATE_DISCONNECTED:
+                            if (connectPartialSession) {
+                                connect(sessEvent);
+                            } else {
+                                return true;
+                            }
+                            break;
+                    }
+                } else if (connectPartialSession) {
+                    connect(sessEvent);
+                }
+
+                return true;
+            }
+            case HttpResponseEvent.TYPE: {
+                SessionEvent sessEvent = event;
+                FlowContext flowContext = flowContext(sessEvent);
+                if (flowContext != null) {
+                    return (flowContext.state() == FlowContext.STATE_RESP_RECEIVED
+                            || flowContext.state() >= FlowContext.STATE_DISCONNECTING);
+                }
+
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public void run() {
+        synchronized (lock) {
+            working = true;
+            while (working) {
+                try {
+                    lock.wait();
+
+                    boolean dataLoopEnd = false;
+                    boolean dataEnd = false;
+
+                    for (; ; ) {
+                        Iterator<Task> it = currentWindow.tasks.iterator();
+                        while (it.hasNext()) {
+                            Task task = it.next();
+                            switch (task.type()) {
+                                case SessionEventsTask.TYPE:
+                                    SessionEventsTask sessionTask = (SessionEventsTask) task;
+                                    if (isBlockedSession(sessionTask.session)) {
+                                        sessionTask.events.clear();
+                                        it.remove();
+                                        return;
+                                    }
+
+                                    if (!sessionTask.events.isEmpty()) {
+                                        Event event = sessionTask.events.get(0);
+                                        if (processSessionEvent((SessionEvent) event)) {
+                                            sessionTask.events.remove(0);
+                                        }
+                                    }
+
+                                    if (sessionTask.events.isEmpty()) {
+                                        it.remove();
+                                    }
+
+                                    break;
+                                case DataLoopEndTask.TYPE:
+                                    dataLoopEnd = true;
+                                    it.remove();
+                                    break;
+                                case DataEndTask.TYPE:
+                                    dataEnd = true;
+                                    it.remove();
+                                    break;
+                            }
+                        }
+
+                        if (currentWindow.tasks.isEmpty()) {
+                            break;
+                        } else if (!flowStateChanged) {
+                            waitQuietly();
+                        } else {
+                            flowStateChanged = false;
+                        }
+                    }
+
+                    if (dataLoopEnd) {
+                        closeAllConnections();
+                    }
+
+                    if (dataEnd) {
+                        working = false;
+                    }
+                } catch (Exception e) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug(e.getMessage(), e);
+                    }
+                } finally {
+                    removeWindow(currentWindow);
+                    processWindow = false;
+                    synchronized (readerLock) {
+                        readerLock.notifyAll();
+                    }
+                }
+            }
+        }
+    }
+
+    private static final class HttpResponseEvent extends SessionEvent {
+
+        public static final int TYPE = 1012;
+
+        private final HttpResponse payload;
+
+        public HttpResponseEvent(SessionInfo sessionInfo, String sourceName, HttpResponse payload, long timestamp) {
+            super(sessionInfo, sourceName);
+            this.payload = payload;
+            setTimestamp(timestamp);
+        }
+
+        public HttpResponse getPayload() {
+            return payload;
+        }
+
+        @Override
+        public SessionEvent instanceForWorker(int index) {
+            throw new UnsupportedOperationException("Not supported yet.");
+        }
+
+        @Override
+        public int getType() {
+            return TYPE;
+        }
+
+    }
+
+    private static interface Task {
+
+        public int type();
+
+    }
+
+    private static final class SessionEventsTask implements Task {
+
+        public static final int TYPE = 1;
+
+        private final SessionInfo session;
+
+        private final List<SessionEvent> events = new LinkedList<>();
+
+        public SessionEventsTask(SessionInfo session) {
+            this.session = session;
+        }
+
+        @Override
+        public int type() {
+            return TYPE;
+        }
+
+    }
+
+    private static final class DataLoopEndTask implements Task {
+
+        public static final DataLoopEndTask INSTANCE = new DataLoopEndTask();
+
+        public static final int TYPE = 2;
+
+        @Override
+        public int type() {
+            return TYPE;
+        }
+
+    }
+
+    private static final class DataEndTask implements Task {
+
+        public static final DataEndTask INSTANCE = new DataEndTask();
+
+        public static final int TYPE = 3;
+
+        @Override
+        public int type() {
+            return TYPE;
+        }
+
+    }
+
+    private static final class TasksTimeWindow {
+
+        private final long startTime;
+
+        private final long endTime;
+
+        private final List<Task> tasks = new LinkedList<>();
+
+        private TasksTimeWindow(long time, long period) {
+            int factor = (int) (time / period);
+            startTime = factor * period;
+            endTime = (factor + 1) * period;
+        }
+
+        public long startTime() {
+            return startTime;
+        }
+
+        public long endTime() {
+            return endTime;
+        }
+
+        public boolean inTimeRange(long time) {
+            return (startTime <= time && time < endTime);
+        }
+
+        public void add(Task value) {
+            tasks.add(value);
+        }
+
+        private SessionEventsTask createSessionEventsTask(SessionInfo session) {
+            SessionEventsTask task = new SessionEventsTask(session);
+            tasks.add(task);
+            return task;
+        }
+
+        public SessionEventsTask getSessionEventsTask(SessionInfo session, boolean create) {
+            if (tasks.isEmpty()) {
+                if (create) {
+                    return createSessionEventsTask(session);
+                }
+
+                return null;
+            }
+
+            for (Task task : tasks) {
+                if (task.type() == SessionEventsTask.TYPE) {
+                    SessionEventsTask sessionTask = (SessionEventsTask) task;
+                    if (sessionTask.session.equals(session)) {
+                        return sessionTask;
+                    }
+                }
+            }
+
+            if (create) {
+                return createSessionEventsTask(session);
+            }
+
+            return null;
+        }
+
+        public List<Task> values() {
+            return tasks;
+        }
+
+        public long getPeroid() {
+            return (endTime - startTime);
+        }
+
+        @Override
+        public String toString() {
+            return "TimeWindow{"
+                    + "startTimestamp=" + startTime
+                    + ", endTimestamp=" + endTime + '}';
+        }
+
+        @Override
+        public int hashCode() {
+            int hash = 5;
+            hash = 17 * hash + (int) (this.startTime ^ (this.startTime >>> 32));
+            hash = 17 * hash + (int) (this.endTime ^ (this.endTime >>> 32));
+            return hash;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            } else if (!(obj instanceof TasksTimeWindow)) {
+                return false;
+            }
+
+            final TasksTimeWindow other = (TasksTimeWindow) obj;
+            return this.startTime == other.startTime
+                    && this.endTime == other.endTime;
+        }
+
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/client/ClientXFactoryImpl.java	Wed May 29 15:03:59 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/ClientXFactoryImpl.java	Thu May 30 10:26:34 2019 +0200
@@ -13,6 +13,6 @@
                 return new HttpClientX();
         }
 
-        return null;
+        throw new IllegalArgumentException("Not supported protocol '" + protocolId + "'.");
     }
 }
--- a/stress-tester/src/main/java/com/passus/st/client/FilterAware.java	Wed May 29 15:03:59 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FilterAware.java	Thu May 30 10:26:34 2019 +0200
@@ -8,10 +8,10 @@
  */
 public interface FilterAware {
 
-    public List<FlowFilter> getFilters();
+    List<FlowFilter> getFilters();
 
-    public void setFilters(Collection<FlowFilter> filters);
+    void setFilters(Collection<FlowFilter> filters);
 
-    public void addFilter(FlowFilter filter);
+    void addFilter(FlowFilter filter);
 
 }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java	Wed May 29 15:03:59 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java	Thu May 30 10:26:34 2019 +0200
@@ -25,6 +25,8 @@
 
     protected final Emitter emitter;
 
+    protected ClientXFactory clientFactory = new ClientXFactoryImpl();
+
     protected boolean collectMetric;
 
     protected HttpClientWorkerMetric metric;
@@ -33,8 +35,6 @@
 
     protected TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
 
-    protected ClientXFactory clientFactory;
-
     public FlowWorker(Emitter emitter, String name, int index) {
         super(name + index);
         Assert.notNull(emitter, "emitter");
@@ -65,6 +65,15 @@
         this.filterChain = filterChain;
     }
 
+    public ClientXFactory getClientFactory() {
+        return clientFactory;
+    }
+
+    public void setClientFactory(ClientXFactory clientFactory) {
+        Assert.notNull(clientFactory, "clientFactory");
+        this.clientFactory = clientFactory;
+    }
+
     protected final void fireResponseReceived(Object request, Object response, FlowContext context) {
         if (listener != null) {
             listener.responseReceived(request, response, context);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java	Wed May 29 15:03:59 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java	Thu May 30 10:26:34 2019 +0200
@@ -111,7 +111,6 @@
                     flowContext.clear();
                     break;
                 case FlowContext.STATE_CONNECTED:
-                    flowContext.client().init(flowContext);
                     flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY));
                     break;
                 case FlowContext.STATE_ERROR:
@@ -194,6 +193,10 @@
             }
 
             FlowContext flowContext = createFlowContext(session);
+            //TODO Malo optymalne
+            ClientX client = clientFactory.create(session.getProtocolId());
+            client.init(flowContext);
+            flowContext.client(client);
             sessions.put(session, flowContext);
             return flowContext;
         }
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Wed May 29 15:03:59 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Thu May 30 10:26:34 2019 +0200
@@ -234,7 +234,10 @@
                 loopEnd = true;
                 closeAllConnections();
                 filterChain.reset();
-                loop = currFlowContext.loop() + 1;
+                if (currFlowContext != null) {
+                    loop = currFlowContext.loop() + 1;
+                }
+
                 loopEnd = false;
                 return true;
             } else if (event.getType() == DataEvents.DataEnd.TYPE) {
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java	Wed May 29 15:03:59 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,567 +0,0 @@
-package com.passus.st.client.http;
-
-import com.passus.commons.Assert;
-import com.passus.commons.annotations.Plugin;
-import com.passus.net.http.HttpRequest;
-import com.passus.net.http.HttpResponse;
-import com.passus.st.client.DataEvents.DataEnd;
-import com.passus.st.client.DataEvents.DataLoopEnd;
-import com.passus.st.client.*;
-import com.passus.st.emitter.Emitter;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.plugin.PluginConstants;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * @author Mirosław Hawrot
- */
-@Deprecated
-@Plugin(name = HttpAsynchClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER)
-public class HttpAsynchClientWorker extends HttpFlowBasedClientWorker {
-
-    public static final String TYPE = "asynch";
-
-    private long waitTimeout = 100;
-
-    private long windowPeriod = 10;
-
-    private final Queue<TasksTimeWindow> windows = new ConcurrentLinkedQueue<>();
-
-    private TasksTimeWindow currentWindow;
-
-    private volatile boolean processWindow = false;
-
-    private volatile boolean flowStateChanged = false;
-
-    private final Object readerLock = new Object();
-
-    private boolean responseSynch = false;
-
-    public HttpAsynchClientWorker(Emitter emitter, String name, int index) {
-        super(emitter, name, index);
-
-    }
-
-    public long getWindowPeriod() {
-        return windowPeriod;
-    }
-
-    public void setWindowPeriod(long windowPeriod) {
-        Assert.greaterThanZero(windowPeriod, "tickTime");
-        this.windowPeriod = windowPeriod;
-    }
-
-    public long getWaitTimeout() {
-        return waitTimeout;
-    }
-
-    public void setWaitTimeout(long waitTimeout) {
-        Assert.greaterThanZero(waitTimeout, "eventsQueueWaitTime");
-        this.waitTimeout = waitTimeout;
-    }
-
-    public boolean isResponseSynch() {
-        return responseSynch;
-    }
-
-    public void setResponseSynch(boolean responseSynch) {
-        this.responseSynch = responseSynch;
-    }
-
-    private void waitQuietly() {
-        try {
-            lock.wait(waitTimeout);
-        } catch (InterruptedException ignore) {
-        }
-    }
-
-    @Override
-    protected void closeAllConnections() {
-        synchronized (lock) {
-            boolean wait;
-            do {
-                wait = false;
-                for (FlowContext flowContext : sessions.values()) {
-                    if (flowContext.state() == FlowContext.STATE_REQ_SENT) {
-                        wait = true;
-                        break;
-                    }
-                }
-
-                if (wait) {
-                    try {
-                        lock.wait(100);
-                    } catch (Exception e) {
-                    }
-                }
-            } while (wait);
-
-            super.closeAllConnections();
-            while (!sessions.isEmpty()) {
-                try {
-                    lock.wait(100);
-                } catch (Exception e) {
-                }
-            }
-        }
-    }
-
-    private void removeWindow(TasksTimeWindow window) {
-        if (window != null) {
-            windows.remove(window);
-        }
-    }
-
-    private TasksTimeWindow createWindow(long time) {
-        TasksTimeWindow window = new TasksTimeWindow(time, windowPeriod);
-        windows.add(window);
-        return window;
-    }
-
-    private TasksTimeWindow getWindow(long time, boolean create) {
-        if (windows.isEmpty()) {
-            if (create) {
-                return createWindow(time);
-            }
-
-            return null;
-        }
-
-        TasksTimeWindow firstWindow = windows.peek();
-        if (firstWindow.inTimeRange(time)) {
-            return firstWindow;
-        } else if (windows.size() > 1) {
-            Iterator<TasksTimeWindow> it = windows.iterator();
-            it.next();
-
-            while (it.hasNext()) {
-                TasksTimeWindow window = it.next();
-                if (window.inTimeRange(time)) {
-                    return window;
-                }
-            }
-        }
-
-        if (create) {
-            return createWindow(time);
-        }
-
-        return null;
-    }
-
-    @Override
-    public void sessionInvalidated(SessionInfo session) throws Exception {
-        synchronized (lock) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Session {} invalidated.", session);
-            }
-
-            FlowContext flowContext = flowContext(session);
-            if (flowContext != null) {
-                changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING);
-            }
-
-            addBlockedSession(session);
-            lock.notifyAll();
-        }
-    }
-
-    @Override
-    protected void flowStateChanged(FlowContext context, int oldState) {
-        synchronized (lock) {
-            flowStateChanged = true;
-            lock.notifyAll();
-        }
-    }
-
-    private void addEvent(Event event, TasksTimeWindow window) {
-        switch (event.getType()) {
-            case SessionPayloadEvent.TYPE: {
-                Event newEvent = eventInstanceForWorker(event);
-                long time = newEvent.getTimestamp();
-                SessionPayloadEvent payloadEvent = (SessionPayloadEvent) newEvent;
-                SessionInfo session = payloadEvent.getSessionInfo();
-
-                SessionEventsTask task = window.getSessionEventsTask(session, true);
-                task.events.add(payloadEvent);
-
-                if (responseSynch) {
-                    HttpResponse resp = (HttpResponse) payloadEvent.getResponse();
-                    HttpRequest req = (HttpRequest) payloadEvent.getRequest();
-                    long respTime = time + (resp.getTimestamp() - req.getTimestamp());
-                    HttpResponseEvent respEvent = new HttpResponseEvent(session, event.getSourceName(), resp, respTime);
-
-                    task.events.add(respEvent);
-                }
-
-                break;
-            }
-            case SessionStatusEvent.TYPE: {
-                Event newEvent = eventInstanceForWorker(event);
-                SessionStatusEvent statusEvent = (SessionStatusEvent) newEvent;
-                SessionEventsTask task = window.getSessionEventsTask(statusEvent.getSessionInfo(), true);
-                task.events.add(statusEvent);
-                break;
-            }
-            case DataLoopEnd.TYPE:
-                window.add(DataLoopEndTask.INSTANCE);
-                processWindow = true;
-                break;
-            case DataEnd.TYPE:
-                window.add(DataEndTask.INSTANCE);
-                processWindow = true;
-                break;
-        }
-    }
-
-    @Override
-    public void handle(Event event) {
-        synchronized (readerLock) {
-            while (processWindow) {
-                try {
-                    readerLock.wait();
-                } catch (InterruptedException ignore) {
-
-                }
-            }
-        }
-
-        synchronized (lock) {
-            TasksTimeWindow window = getWindow(event.getTimestamp(), true);
-            if (currentWindow == null) {
-                currentWindow = window;
-            } else if (window != currentWindow) {
-                currentWindow = windows.peek();
-                processWindow = true;
-            }
-
-            addEvent(event, window);
-            if (processWindow) {
-                lock.notifyAll();
-            }
-        }
-
-    }
-
-    private boolean processSessionEvent(SessionEvent event) {
-        switch (event.getType()) {
-            case SessionStatusEvent.TYPE: {
-                SessionStatusEvent statusEvent = (SessionStatusEvent) event;
-                if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
-                    connect(statusEvent);
-                } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
-                    FlowContext flowContext = flowContext(statusEvent);
-                    if (flowContext != null) {
-                        if (flowContext.state() != FlowContext.STATE_REQ_SENT) {
-                            close(statusEvent);
-                        }
-                    }
-                }
-
-                return true;
-            }
-            case HttpSessionPayloadEvent.TYPE: {
-                SessionEvent sessEvent = event;
-                FlowContext flowContext = flowContext(sessEvent);
-                if (flowContext != null) {
-                    switch (flowContext.state()) {
-                        case FlowContext.STATE_CONNECTING:
-                        case FlowContext.STATE_REQ_SENT:
-                            return false;
-                        case FlowContext.STATE_CONNECTED:
-                        case FlowContext.STATE_RESP_RECEIVED:
-                        case FlowContext.STATE_ERROR:
-                            if (send(flowContext, (HttpSessionPayloadEvent) event)) {
-                                return true;
-                            }
-                        case FlowContext.STATE_DISCONNECTING:
-                        case FlowContext.STATE_DISCONNECTED:
-                            if (connectPartialSession) {
-                                connect(sessEvent);
-                            } else {
-                                return true;
-                            }
-                            break;
-                    }
-                } else if (connectPartialSession) {
-                    connect(sessEvent);
-                }
-
-                return true;
-            }
-            case HttpResponseEvent.TYPE: {
-                SessionEvent sessEvent = event;
-                FlowContext flowContext = flowContext(sessEvent);
-                if (flowContext != null) {
-                    return (flowContext.state() == FlowContext.STATE_RESP_RECEIVED
-                            || flowContext.state() >= FlowContext.STATE_DISCONNECTING);
-                }
-
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    @Override
-    public void run() {
-        synchronized (lock) {
-            working = true;
-            while (working) {
-                try {
-                    lock.wait();
-
-                    boolean dataLoopEnd = false;
-                    boolean dataEnd = false;
-
-                    for (; ; ) {
-                        Iterator<Task> it = currentWindow.tasks.iterator();
-                        while (it.hasNext()) {
-                            Task task = it.next();
-                            switch (task.type()) {
-                                case SessionEventsTask.TYPE:
-                                    SessionEventsTask sessionTask = (SessionEventsTask) task;
-                                    if (isBlockedSession(sessionTask.session)) {
-                                        sessionTask.events.clear();
-                                        it.remove();
-                                        return;
-                                    }
-
-                                    if (!sessionTask.events.isEmpty()) {
-                                        Event event = sessionTask.events.get(0);
-                                        if (processSessionEvent((SessionEvent) event)) {
-                                            sessionTask.events.remove(0);
-                                        }
-                                    }
-
-                                    if (sessionTask.events.isEmpty()) {
-                                        it.remove();
-                                    }
-
-                                    break;
-                                case DataLoopEndTask.TYPE:
-                                    dataLoopEnd = true;
-                                    it.remove();
-                                    break;
-                                case DataEndTask.TYPE:
-                                    dataEnd = true;
-                                    it.remove();
-                                    break;
-                            }
-                        }
-
-                        if (currentWindow.tasks.isEmpty()) {
-                            break;
-                        } else if (!flowStateChanged) {
-                            waitQuietly();
-                        } else {
-                            flowStateChanged = false;
-                        }
-                    }
-
-                    if (dataLoopEnd) {
-                        closeAllConnections();
-                    }
-
-                    if (dataEnd) {
-                        working = false;
-                    }
-                } catch (Exception e) {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug(e.getMessage(), e);
-                    }
-                } finally {
-                    removeWindow(currentWindow);
-                    processWindow = false;
-                    synchronized (readerLock) {
-                        readerLock.notifyAll();
-                    }
-                }
-            }
-        }
-    }
-
-    private static final class HttpResponseEvent extends SessionEvent {
-
-        public static final int TYPE = 1012;
-
-        private final HttpResponse payload;
-
-        public HttpResponseEvent(SessionInfo sessionInfo, String sourceName, HttpResponse payload, long timestamp) {
-            super(sessionInfo, sourceName);
-            this.payload = payload;
-            setTimestamp(timestamp);
-        }
-
-        public HttpResponse getPayload() {
-            return payload;
-        }
-
-        @Override
-        public SessionEvent instanceForWorker(int index) {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-
-        @Override
-        public int getType() {
-            return TYPE;
-        }
-
-    }
-
-    private static interface Task {
-
-        public int type();
-
-    }
-
-    private static final class SessionEventsTask implements Task {
-
-        public static final int TYPE = 1;
-
-        private final SessionInfo session;
-
-        private final List<SessionEvent> events = new LinkedList<>();
-
-        public SessionEventsTask(SessionInfo session) {
-            this.session = session;
-        }
-
-        @Override
-        public int type() {
-            return TYPE;
-        }
-
-    }
-
-    private static final class DataLoopEndTask implements Task {
-
-        public static final DataLoopEndTask INSTANCE = new DataLoopEndTask();
-
-        public static final int TYPE = 2;
-
-        @Override
-        public int type() {
-            return TYPE;
-        }
-
-    }
-
-    private static final class DataEndTask implements Task {
-
-        public static final DataEndTask INSTANCE = new DataEndTask();
-
-        public static final int TYPE = 3;
-
-        @Override
-        public int type() {
-            return TYPE;
-        }
-
-    }
-
-    private static final class TasksTimeWindow {
-
-        private final long startTime;
-
-        private final long endTime;
-
-        private final List<Task> tasks = new LinkedList<>();
-
-        private TasksTimeWindow(long time, long period) {
-            int factor = (int) (time / period);
-            startTime = factor * period;
-            endTime = (factor + 1) * period;
-        }
-
-        public long startTime() {
-            return startTime;
-        }
-
-        public long endTime() {
-            return endTime;
-        }
-
-        public boolean inTimeRange(long time) {
-            return (startTime <= time && time < endTime);
-        }
-
-        public void add(Task value) {
-            tasks.add(value);
-        }
-
-        private SessionEventsTask createSessionEventsTask(SessionInfo session) {
-            SessionEventsTask task = new SessionEventsTask(session);
-            tasks.add(task);
-            return task;
-        }
-
-        public SessionEventsTask getSessionEventsTask(SessionInfo session, boolean create) {
-            if (tasks.isEmpty()) {
-                if (create) {
-                    return createSessionEventsTask(session);
-                }
-
-                return null;
-            }
-
-            for (Task task : tasks) {
-                if (task.type() == SessionEventsTask.TYPE) {
-                    SessionEventsTask sessionTask = (SessionEventsTask) task;
-                    if (sessionTask.session.equals(session)) {
-                        return sessionTask;
-                    }
-                }
-            }
-
-            if (create) {
-                return createSessionEventsTask(session);
-            }
-
-            return null;
-        }
-
-        public List<Task> values() {
-            return tasks;
-        }
-
-        public long getPeroid() {
-            return (endTime - startTime);
-        }
-
-        @Override
-        public String toString() {
-            return "TimeWindow{"
-                    + "startTimestamp=" + startTime
-                    + ", endTimestamp=" + endTime + '}';
-        }
-
-        @Override
-        public int hashCode() {
-            int hash = 5;
-            hash = 17 * hash + (int) (this.startTime ^ (this.startTime >>> 32));
-            hash = 17 * hash + (int) (this.endTime ^ (this.endTime >>> 32));
-            return hash;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj) {
-                return true;
-            } else if (!(obj instanceof TasksTimeWindow)) {
-                return false;
-            }
-
-            final TasksTimeWindow other = (TasksTimeWindow) obj;
-            return this.startTime == other.startTime
-                    && this.endTime == other.endTime;
-        }
-
-    }
-}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java	Wed May 29 15:03:59 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java	Thu May 30 10:26:34 2019 +0200
@@ -273,9 +273,9 @@
             worker.setListeners(listeners);
             worker.setFilterChain(filterChain.instanceForWorker(i));
 
-            if (worker instanceof HttpFlowBasedClientWorker) {
-                ((HttpFlowBasedClientWorker) worker).setSleepFactor(sleepFactor);
-            }
+//            if (worker instanceof HttpFlowBasedClientWorker) {
+//                ((HttpFlowBasedClientWorker) worker).setSleepFactor(sleepFactor);
+//            }
 
             worker.setCollectMetrics(collectMetrics);
             worker.setConnectPartialSession(connectPartialSession);
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java	Wed May 29 15:03:59 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,683 +0,0 @@
-package com.passus.st.client.http;
-
-import com.passus.commons.Assert;
-import com.passus.commons.time.TimeAware;
-import com.passus.commons.time.TimeGenerator;
-import com.passus.data.ByteBuff;
-import com.passus.data.DataDecoder;
-import com.passus.data.HeapByteBuff;
-import com.passus.net.http.HttpFullMessageDecoder;
-import com.passus.net.http.HttpRequest;
-import com.passus.net.http.HttpRequestEncoder;
-import com.passus.net.http.HttpResponse;
-import com.passus.st.client.Event;
-import com.passus.st.client.FlowContext;
-import com.passus.st.client.SessionEvent;
-import com.passus.st.client.http.filter.HttpFilter;
-import com.passus.st.client.http.filter.HttpFlowUtils;
-import com.passus.st.emitter.ChannelContext;
-import com.passus.st.emitter.Emitter;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.metric.MetricsContainer;
-import org.apache.logging.log4j.Level;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static com.passus.st.client.FlowContext.*;
-import static com.passus.st.client.FlowUtils.DEFAULT_TIMEOUTS;
-import static com.passus.st.client.http.HttpConsts.*;
-import static com.passus.st.client.http.filter.HttpFlowUtils.extractHttpContext;
-
-/**
- * @author Mirosław Hawrot
- */
-@Deprecated
-public abstract class HttpFlowBasedClientWorker extends HttpClientWorker implements TimeAware {
-
-    protected final Map<SessionInfo, FlowContext> sessions = new ConcurrentHashMap<>();
-
-    private final Set<SessionInfo> blockedSessions = new HashSet<>();
-
-    private final Map<Integer, Long> timeouts = new HashMap<>();
-
-    private final HttpRequestEncoder reqEncoder = new HttpRequestEncoder();
-
-    protected final Object lock = new Object();
-
-    protected volatile boolean working = false;
-
-    private long checkTimeoutsPeriod = 5_000;
-
-    private long nextCheckTimeoutsTime = -1;
-
-    private float sleepFactor = SLEEP_FACTOR_NO_SLEEP;
-
-    private long lastEventTimestamp = -1;
-
-    protected final HttpScopes scopes = new HttpScopes();
-
-    protected TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
-
-    public HttpFlowBasedClientWorker(Emitter emitter, String name, int index) {
-        super(emitter, name, index);
-        timeouts.putAll(DEFAULT_TIMEOUTS);
-    }
-
-    @Override
-    public boolean isWorking() {
-        return working;
-    }
-
-    @Override
-    public int activeConnections() {
-        int count = 0;
-        synchronized (lock) {
-            for (FlowContext flowContext : sessions.values()) {
-                if (flowContext.state() != STATE_DISCONNECTED) {
-                    count++;
-                }
-            }
-        }
-
-        return count;
-    }
-
-    @Override
-    public TimeGenerator getTimeGenerator() {
-        return timeGenerator;
-    }
-
-    @Override
-    public void setTimeGenerator(TimeGenerator timeGenerator) {
-        Assert.notNull(timeGenerator, "timeGenerator");
-        this.timeGenerator = timeGenerator;
-    }
-
-    protected final void addBlockedSession(SessionInfo session) {
-        blockedSessions.add(session);
-    }
-
-    protected final boolean isBlockedSession(SessionInfo session) {
-        return !blockedSessions.isEmpty() && blockedSessions.contains(session);
-    }
-
-    public float getSleepFactor() {
-        return sleepFactor;
-    }
-
-    public void setSleepFactor(float sleepFactor) {
-        Assert.greaterOrEqualZero(sleepFactor, "sleepFactor");
-        this.sleepFactor = sleepFactor;
-    }
-
-    public long getCheckTimeoutsPeriod() {
-        return checkTimeoutsPeriod;
-    }
-
-    public void setCheckTimeoutsPeriod(long checkTimeoutsPeriod) {
-        Assert.greaterThanZero(checkTimeoutsPeriod, "checkTimeoutsPeriod");
-        this.checkTimeoutsPeriod = checkTimeoutsPeriod;
-    }
-
-    protected final void changeFlowState(FlowContext flowContext, int state) {
-        try {
-            if (flowContext.state() == state) {
-                return;
-            }
-
-            int oldState = flowContext.state();
-            if (logger.isDebugEnabled()) {
-                debug(flowContext, "Flow status changing {} -> {}.",
-                        contextStateToString(flowContext.state()),
-                        contextStateToString(state)
-                );
-            }
-
-            switch (state) {
-                case FlowContext.STATE_CONNECTING:
-                    flowContext.clear();
-                    break;
-                case FlowContext.STATE_CONNECTED:
-                    HttpFullMessageDecoder decoder = new HttpFullMessageDecoder();
-                    decoder.setDecodeRequest(false);
-
-                    flowContext.decoder(new HttpFullMessageDecoder());
-                    flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY));
-                    break;
-                case FlowContext.STATE_ERROR:
-                    changeFlowState(flowContext, STATE_DISCONNECTED);
-                    break;
-                case FlowContext.STATE_RESP_RECEIVED:
-                    flowContext.sentEvent(null);
-                    flowContext.receivedStartTimestamp(-1);
-                    break;
-                case FlowContext.STATE_DISCONNECTING:
-                    if (flowContext.state() < FlowContext.STATE_DISCONNECTING) {
-                        if (flowContext.channelContext() != null) {
-                            try {
-                                flowContext.channelContext().close();
-                            } catch (Exception e) {
-                                if (logger.isDebugEnabled()) {
-                                    logger.debug(e.getMessage(), e);
-                                }
-                            }
-                        } else {
-                            changeFlowState(flowContext, STATE_DISCONNECTED);
-                        }
-                    } else {
-                        return;
-                    }
-                    break;
-                case STATE_DISCONNECTED:
-                    flowContext.sentEvent(null);
-                    flowContext.state(STATE_DISCONNECTED);
-                    flowContext.timeout(-1);
-                    flowContext.clear();
-                    removeFlowContext(flowContext);
-                    flowStateChanged(flowContext, oldState);
-                    return;
-            }
-
-            long timeout = timeouts.get(flowContext.state());
-            flowContext.timeout(timeGenerator.currentTimeMillis() + timeout);
-            flowContext.state(state);
-            flowStateChanged(flowContext, oldState);
-        } catch (Exception e) {
-            logger.debug(e.getMessage(), e);
-        }
-    }
-
-    protected void flowStateChanged(FlowContext context, int oldState) {
-
-    }
-
-    protected FlowContext flowContext(SessionEvent event) {
-        return flowContext(event.getSessionInfo());
-    }
-
-    protected FlowContext flowContext(ChannelContext context) {
-        return flowContext(context.getSessionInfo());
-    }
-
-    protected FlowContext flowContext(SessionInfo session) {
-        FlowContext context = sessions.get(session);
-        if (context == null) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Context for session '" + session + "' not found.");
-            }
-        }
-
-        return context;
-    }
-
-    protected FlowContext createFlowContext(SessionInfo session) {
-        return HttpFlowUtils.createFlowContext(session, scopes);
-    }
-
-    protected FlowContext register(SessionEvent sessionEvent) {
-        return register(sessionEvent.getSessionInfo());
-    }
-
-    protected FlowContext register(SessionInfo session) {
-        synchronized (lock) {
-            if (sessions.containsKey(session)) {
-                logger.warn("Unable to register session '" + session + "'. Session already registered.");
-                return null;
-            }
-
-            FlowContext flowContext = createFlowContext(session);
-            sessions.put(session, flowContext);
-            return flowContext;
-        }
-    }
-
-    protected FlowContext connect(SessionEvent sessionEvent) {
-        return connect(sessionEvent.getSessionInfo());
-    }
-
-    protected FlowContext connect(SessionInfo session) {
-        synchronized (lock) {
-            try {
-                FlowContext flowContext = register(session);
-                if (flowContext != null) {
-                    emitter.connect(session, this, index);
-                    return flowContext;
-                }
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
-            }
-            return null;
-        }
-    }
-
-    @Override
-    public void close() {
-        synchronized (lock) {
-            for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) {
-                FlowContext flowContext = entry.getValue();
-                try {
-                    closeSession(flowContext);
-                } catch (Exception e) {
-                    if (logger.isDebugEnabled()) {
-                        debug(flowContext, e.getMessage(), e);
-                    }
-                }
-            }
-
-            sessions.clear();
-            working = false;
-        }
-    }
-
-    protected void close(SessionEvent sessionEvent) {
-        close(sessionEvent.getSessionInfo());
-    }
-
-    protected void close(FlowContext flowContext) {
-        synchronized (lock) {
-            try {
-                closeSession(flowContext);
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug(e.getMessage(), e);
-                }
-            }
-        }
-    }
-
-    @Override
-    public void close(SessionInfo session) {
-        synchronized (lock) {
-            try {
-                FlowContext flowContext = flowContext(session);
-                closeSession(flowContext);
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug(e.getMessage(), e);
-                }
-            }
-        }
-    }
-
-    protected void closeSession(FlowContext flowContext) {
-        synchronized (lock) {
-            if (flowContext != null) {
-                changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING);
-            }
-        }
-    }
-
-    protected void removeFlowContext(FlowContext flowContext) {
-        synchronized (lock) {
-            debug(flowContext, "removeFlowContext");
-            sessions.remove(flowContext.sessionInfo());
-        }
-    }
-
-    protected void reconnect(FlowContext flowContext) {
-        synchronized (lock) {
-            try {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, "Reconnect (state: {}).", contextStateToString(flowContext.state()));
-                }
-
-                SessionInfo session = flowContext.sessionInfo();
-                changeFlowState(flowContext, FlowContext.STATE_CONNECTING);
-                emitter.connect(session, this, index);
-            } catch (Exception e) {
-                error(flowContext, e.getMessage(), e);
-            }
-        }
-    }
-
-    protected void closeAllConnections() {
-        synchronized (lock) {
-            for (FlowContext flowContext : sessions.values()) {
-                closeSession(flowContext);
-            }
-        }
-    }
-
-    private void sleepSilently(long millis) {
-        if (millis == 0) {
-            return;
-        }
-        if (millis < 0) {
-            logger.warn("Cannot sleep for negative interval: {}.");
-            return;
-        }
-        logger.debug("Going sleep for: {}.", millis);
-        try {
-            Thread.sleep(millis);
-        } catch (InterruptedException ignore) {
-        }
-    }
-
-    protected void sleep(Event event) {
-        if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) {
-            if (lastEventTimestamp != -1) {
-                long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor);
-                sleepSilently(timeToSleep);
-            }
-            lastEventTimestamp = event.getTimestamp();
-        }
-    }
-
-    @Override
-    public void writeMetrics(MetricsContainer container) {
-        synchronized (lock) {
-            super.writeMetrics(container);
-        }
-    }
-
-    @Override
-    public void channelActive(ChannelContext context) throws Exception {
-        synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            if (flowContext != null) {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})",
-                            context.getLocalAddress(),
-                            context.getRemoteAddress());
-                }
-
-                flowContext.channelContext(context);
-                changeFlowState(flowContext, STATE_CONNECTED);
-            }
-
-            lock.notifyAll();
-        }
-    }
-
-    @Override
-    public void channelInactive(ChannelContext context) throws Exception {
-        synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            if (flowContext != null) {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, "Channel inactive.");
-                }
-
-                changeFlowState(flowContext, STATE_DISCONNECTED);
-            }
-            lock.notifyAll();
-        }
-    }
-
-    @Override
-    public void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
-        synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            try {
-                if (flowContext != null) {
-                    HttpFullMessageDecoder decoder = (HttpFullMessageDecoder) flowContext.decoder();
-                    HttpRequest req = null;
-                    if (flowContext.sentEvent() != null) {
-                        req = ((HttpSessionPayloadEvent) flowContext.sentEvent()).getRequest();
-                    }
-
-                    if (req != null) {
-                        decoder.setRequestMethod(req.getMethod());
-                    }
-
-                    decoder.decode(data);
-                    long now = timeGenerator.currentTimeMillis();
-                    if (flowContext.receivedStartTimestamp() == -1) {
-                        flowContext.receivedStartTimestamp(now);
-                    }
-
-                    if (decoder.state() == DataDecoder.STATE_ERROR) {
-                        if (logger.isDebugEnabled()) {
-                            debug(flowContext, "Decoder error. " + decoder.getLastError());
-                        }
-
-                        decoder.clear();
-                        if (req != null) {
-                            extractHttpContext(flowContext).scopes().removeConversation(req);
-                        }
-
-                        changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED);
-                    } else if (decoder.state() == DataDecoder.STATE_FINISHED) {
-                        HttpResponse resp = (HttpResponse) decoder.getResult();
-
-                        if (logger.isDebugEnabled()) {
-                            debug(flowContext,
-                                    "Response decoded (size: {} B, downloaded: {} ms, status: {})",
-                                    decoder.getHeaderSize() + decoder.getContentSize(),
-                                    now - flowContext.receivedStartTimestamp(),
-                                    resp.getStatus().getCode()
-                            );
-                        }
-
-                        if (collectMetric) {
-                            synchronized (metric) {
-                                metric.incResponsesNum();
-                                metric.addResponseStatusCode(resp.getStatus().getCode());
-                                metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize());
-                                metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp());
-                                if (req != null) {
-                                    metric.addResponseTime(now - (long) req.getTag(TAG_TIME_START));
-                                }
-                            }
-                        }
-
-                        resp.setTag(TAG_HEADER_SIZE, decoder.getHeaderSize());
-                        resp.setTag(TAG_CONTENT_SIZE, decoder.getContentSize());
-                        resp.setTag(TAG_TIME_START, flowContext.receivedStartTimestamp());
-                        resp.setTag(TAG_TIME_END, now);
-
-                        if (filterChain.filterInbound(req, resp, flowContext) != HttpFilter.DENY) {
-                            try {
-                                fireResponseReceived(req, resp, flowContext);
-                            } catch (Exception e) {
-                                error(flowContext, e.getMessage(), e);
-                            }
-                        }
-
-                        if (req != null) {
-                            extractHttpContext(flowContext).scopes().removeConversation(req);
-                        }
-                        decoder.clear();
-                        changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED);
-                    }
-                }
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, e.getMessage(), e);
-                }
-            }
-
-            lock.notifyAll();
-        }
-    }
-
-    @Override
-    public void dataWriteStart(ChannelContext context) {
-        synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            if (flowContext != null && flowContext.sentEvent() != null) {
-                long now = timeGenerator.currentTimeMillis();
-                flowContext.sendStartTimestamp(now);
-                ((HttpSessionPayloadEvent) flowContext.sentEvent()).getRequest().setTag(TAG_TIME_START, now);
-            }
-        }
-    }
-
-    @Override
-    public void dataWritten(ChannelContext context) throws Exception {
-        synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            if (flowContext != null && flowContext.sentEvent() != null) {
-                long now = timeGenerator.currentTimeMillis();
-                if (collectMetric) {
-                    synchronized (metric) {
-                        metric.addRequestSendingTime(now - flowContext.sendStartTimestamp());
-                    }
-                }
-
-                ((HttpSessionPayloadEvent) (flowContext.sentEvent())).getRequest().setTag(TAG_TIME_END, now);
-            }
-
-            lock.notifyAll();
-
-        }
-    }
-
-    @Override
-    public void errorOccurred(ChannelContext context, Throwable cause) throws Exception {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Error occured. " + cause.getMessage(), cause);
-        }
-
-        synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            if (flowContext != null) {
-                changeFlowState(flowContext, FlowContext.STATE_ERROR);
-            }
-
-            lock.notifyAll();
-        }
-    }
-
-    protected boolean send(FlowContext context, HttpSessionPayloadEvent event) {
-        synchronized (lock) {
-            if (event.getRequest() != null) {
-                HttpRequest req = event.getRequest();
-                if (filterChain.filterOutbound(req, event.getResponse(), context) == HttpFilter.DENY) {
-                    return false;
-                }
-
-                reqEncoder.encodeHeaders(req, context.buffer());
-                long headerSize = context.buffer().readableBytes();
-                reqEncoder.encodeContent(req, context.buffer());
-
-                req.setTag(TAG_HEADER_SIZE, headerSize);
-                req.setTag(TAG_CONTENT_SIZE, context.buffer().readableBytes() - headerSize);
-
-                if (collectMetric) {
-                    synchronized (metric) {
-                        metric.incRequestsNum();
-                        metric.addRequestSize(context.buffer().readableBytes());
-                        //metric.addRequestUrl(req.getUrl());
-                    }
-                }
-
-                try {
-                    changeFlowState(context, FlowContext.STATE_REQ_SENT);
-                    context.sentEvent(event);
-
-                    context.channelContext().writeAndFlush(context.buffer());
-                    if (logger.isDebugEnabled()) {
-                        debug(context, "Request '{}' sending ({} bytes).", req.getUrl(), context.buffer().length());
-                    }
-
-                    context.buffer().clear();
-
-                    return true;
-                } catch (Exception e) {
-                    if (logger.isDebugEnabled()) {
-                        debug(context, e.getMessage(), e);
-                    }
-                }
-            }
-        }
-
-        return false;
-    }
-
-    protected void processTimeouts() {
-        synchronized (lock) {
-            try {
-                long now = timeGenerator.currentTimeMillis();
-                if (nextCheckTimeoutsTime == -1) {
-                    nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
-                } else if (nextCheckTimeoutsTime > now) {
-                    nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
-                    for (FlowContext flowContext : sessions.values()) {
-                        if (flowContext.timeouted()) {
-                            if (logger.isDebugEnabled()) {
-                                debug(flowContext, "Flow for session '{}' timed out (state '{}').",
-                                        flowContext.sessionInfo(),
-                                        contextStateToString(flowContext.state()));
-                            }
-
-                            switch (flowContext.state()) {
-                                case FlowContext.STATE_CONNECTING:
-                                case FlowContext.STATE_CONNECTED:
-                                case FlowContext.STATE_REQ_SENT:
-                                case FlowContext.STATE_ERROR:
-                                    closeSession(flowContext);
-                                    break;
-                                case FlowContext.STATE_RESP_RECEIVED:
-                                    //Dziwny blad nie powinien wystepowac
-                                    break;
-                                case FlowContext.STATE_DISCONNECTING:
-                                case STATE_DISCONNECTED:
-                                    removeFlowContext(flowContext);
-                                    break;
-                            }
-                        }
-                    }
-                }
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug(e.getMessage(), e);
-                }
-            }
-        }
-    }
-
-    protected Event eventInstanceForWorker(Event event) {
-        if (event instanceof SessionEvent) {
-            Event newEvent = ((SessionEvent) event).instanceForWorker(index);
-            newEvent.setTimestamp(event.getTimestamp());
-            return newEvent;
-        } else {
-            return event;
-        }
-    }
-
-    protected final void trace(FlowContext flowContext, String message, Object... args) {
-        log(flowContext, Level.TRACE, message, args);
-    }
-
-    protected final void debug(FlowContext flowContext, String message, Throwable cause) {
-        log(flowContext, Level.DEBUG, message, cause);
-    }
-
-    protected final void error(FlowContext flowContext, String message, Throwable cause) {
-        log(flowContext, Level.ERROR, message, cause);
-    }
-
-    protected final void log(FlowContext flowContext, Level level, String message, Throwable cause) {
-        message = String.format("%s [%s]", message, flowContext.sessionInfo());
-        logger.log(level, message, cause);
-    }
-
-    protected final void debug(FlowContext flowContext, String message, Object... args) {
-        log(flowContext, Level.DEBUG, message, args);
-    }
-
-    protected final void log(FlowContext flowContext, Level level, String message, Object... args) {
-        if (args.length > 0) {
-            message = String.format(message, args);
-        }
-
-        SessionInfo session = flowContext.sessionInfo();
-        if (args.length == 0) {
-            logger.log(level, message + " [{}]", session);
-        } else {
-            Object[] logArgs = new Object[args.length + 1];
-            for (int i = 0; i < args.length; i++) {
-                logArgs[i] = args[i];
-            }
-
-            logArgs[logArgs.length - 1] = session;
-            logger.log(level, message + " [{}]", logArgs);
-        }
-    }
-}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpNullClientWorker.java	Wed May 29 15:03:59 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,48 +0,0 @@
-package com.passus.st.client.http;
-
-import com.passus.st.client.DataEvents;
-import com.passus.st.client.Event;
-import com.passus.st.emitter.Emitter;
-import com.passus.st.emitter.SessionInfo;
-
-/**
- *
- * @author Mirosław Hawrot
- */
-@Deprecated
-public class HttpNullClientWorker extends HttpClientWorker {
-
-    private boolean working = true;
-
-    public HttpNullClientWorker(Emitter emitter, String name, int index) {
-        super(emitter, name, index);
-    }
-
-    @Override
-    public boolean isWorking() {
-        return working;
-    }
-
-    @Override
-    public int activeConnections() {
-        return 0;
-    }
-
-    @Override
-    public void close() {
-        working = false;
-    }
-
-    @Override
-    public void close(SessionInfo session) {
-
-    }
-
-    @Override
-    public void handle(Event event) {
-        if (event.getType() == DataEvents.DataLoopEnd.TYPE) {
-            working = false;
-        }
-    }
-
-}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java	Wed May 29 15:03:59 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,319 +0,0 @@
-package com.passus.st.client.http;
-
-import com.passus.commons.Assert;
-import com.passus.commons.annotations.Plugin;
-import com.passus.st.client.DataEvents.DataEnd;
-import com.passus.st.client.DataEvents.DataLoopEnd;
-import com.passus.st.client.Event;
-import com.passus.st.client.FlowContext;
-import com.passus.st.client.SessionEvent;
-import com.passus.st.client.SessionStatusEvent;
-import com.passus.st.emitter.ChannelContext;
-import com.passus.st.emitter.Emitter;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.plugin.PluginConstants;
-
-import java.util.*;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.Semaphore;
-
-/**
- * @author Mirosław Hawrot
- */
-@Deprecated
-@Plugin(name = HttpParallelClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER)
-public final class HttpParallelClientWorker extends HttpFlowBasedClientWorker {
-
-    public static final String TYPE = "parallel";
-
-    public static final int DEFAULT_MAX_SENT_REQUESTS = 10;
-
-    private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>();
-
-    private int maxSentRequests = DEFAULT_MAX_SENT_REQUESTS;
-
-    private long eventsQueueWaitTime = 100;
-
-    private final Semaphore semaphore = new Semaphore(maxSentRequests);
-
-    private final Deque<LocalFlowContext> flowIndex = new ArrayDeque<>();
-
-    private boolean closeAllConnections = false;
-
-    public HttpParallelClientWorker(Emitter emitter, String name, int index) {
-        super(emitter, name, index);
-    }
-
-    public int getMaxSentRequests() {
-        return maxSentRequests;
-    }
-
-    public void setMaxSentRequests(int maxSentRequests) {
-        Assert.greaterThanZero(maxSentRequests, "maxSentRequests");
-        this.maxSentRequests = maxSentRequests;
-    }
-
-    @Override
-    protected LocalFlowContext flowContext(SessionInfo session) {
-        return (LocalFlowContext) super.flowContext(session);
-    }
-
-    @Override
-    protected LocalFlowContext flowContext(ChannelContext context) {
-        return flowContext(context.getSessionInfo());
-    }
-
-    @Override
-    protected LocalFlowContext flowContext(SessionEvent event) {
-        return flowContext(event.getSessionInfo());
-    }
-
-    public long getEventsQueueWaitTime() {
-        return eventsQueueWaitTime;
-    }
-
-    public void setEventsQueueWaitTime(long eventsQueueWaitTime) {
-        Assert.greaterThanZero(eventsQueueWaitTime, "eventsQueueWaitTime");
-        this.eventsQueueWaitTime = eventsQueueWaitTime;
-    }
-
-    @Override
-    protected void removeFlowContext(FlowContext flowContext) {
-        if (flowContext != null) {
-            flowIndex.remove(flowContext);
-        }
-    }
-
-    @Override
-    protected LocalFlowContext createFlowContext(SessionInfo session) {
-        LocalFlowContext flowContext = new LocalFlowContext(session, scopes);
-        flowIndex.add(flowContext);
-        return flowContext;
-    }
-
-    private void waitCloseAllConnections() {
-        closeAllConnections = true;
-        synchronized (lock) {
-            while (!flowIndex.isEmpty()) {
-                try {
-                    lock.wait(10);
-                } catch (InterruptedException ignore) {
-                }
-            }
-        }
-
-        closeAllConnections = false;
-    }
-
-    @Override
-    protected boolean send(FlowContext flowContext, HttpSessionPayloadEvent event) {
-        //Sprawdzamy, czy polaczen nie jest za duzo. Jezeli jest, to zamykamy
-        //najmniej uzywane.
-        if (flowIndex.size() > maxSentRequests) {
-            int diff = flowIndex.size() - maxSentRequests;
-            if (logger.isDebugEnabled()) {
-                debug(flowContext, "Too many connections {}.", flowIndex.size());
-            }
-
-            Iterator<LocalFlowContext> it = flowIndex.descendingIterator();
-            while (it.hasNext()) {
-                LocalFlowContext indexFlowContext = it.next();
-                if (indexFlowContext.eventsQueue.isEmpty()
-                        && indexFlowContext.state() != FlowContext.STATE_REQ_SENT) {
-                    close(flowContext);
-                    if (--diff == 0) {
-                        break;
-                    }
-                }
-            }
-        }
-
-        return super.send(flowContext, event);
-    }
-
-    private boolean canSend(FlowContext flowContext) {
-        int state = flowContext.state();
-        return (state == FlowContext.STATE_CONNECTED
-                || state == FlowContext.STATE_RESP_RECEIVED
-                || state == FlowContext.STATE_ERROR
-                || state == FlowContext.STATE_REQ_SENT);
-    }
-
-    @Override
-    protected void flowStateChanged(FlowContext flowContext, int oldState) {
-        LocalFlowContext localFlowContext = (LocalFlowContext) flowContext;
-        if (oldState == FlowContext.STATE_REQ_SENT) {
-            if (semaphore.availablePermits() <= maxSentRequests) {
-                semaphore.release();
-            }
-        }
-
-        if (closeAllConnections) {
-            if (localFlowContext.state() < FlowContext.STATE_DISCONNECTING
-                    && localFlowContext.state() != FlowContext.STATE_REQ_SENT
-                    && localFlowContext.eventsQueue.isEmpty()) {
-                close(flowContext);
-                return;
-            }
-        }
-
-        if (localFlowContext.state() >= FlowContext.STATE_CONNECTED
-                && localFlowContext.state() < FlowContext.STATE_DISCONNECTING
-                && localFlowContext.state() != FlowContext.STATE_REQ_SENT
-                && !localFlowContext.eventsQueue.isEmpty()) {
-
-            Event event = localFlowContext.eventsQueue.peek();
-            if (event.getType() == SessionStatusEvent.TYPE) {
-                SessionStatusEvent statusEvent = (SessionStatusEvent) event;
-                if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
-                    localFlowContext.eventsQueue.poll();
-                    close((SessionStatusEvent) event);
-                }
-            } else if (event.getType() == HttpSessionPayloadEvent.TYPE
-                    && canSend(flowContext)) {
-                localFlowContext.eventsQueue.poll();
-                send(flowContext, (HttpSessionPayloadEvent) event);
-            } else {
-                localFlowContext.eventsQueue.poll();
-            }
-        }
-    }
-
-    private void makeFirst(LocalFlowContext flowContext) {
-        synchronized (lock) {
-            flowIndex.remove(flowContext);
-            flowIndex.addFirst(flowContext);
-        }
-    }
-
-    private void addToQueue(LocalFlowContext flowContext, Event event) {
-        flowContext.eventsQueue.add(event);
-        makeFirst(flowContext);
-    }
-
-    @Override
-    public void handle(Event event) {
-        Event newEvent = null;
-        switch (event.getType()) {
-            case HttpSessionPayloadEvent.TYPE:
-                semaphore.acquireUninterruptibly();
-                newEvent = eventInstanceForWorker(event);
-                break;
-            case SessionStatusEvent.TYPE:
-            case DataLoopEnd.TYPE:
-            case DataEnd.TYPE:
-                newEvent = event;
-        }
-
-        if (newEvent != null) {
-            synchronized (lock) {
-                try {
-                    eventsQueue.put(newEvent);
-                    lock.notifyAll();
-                } catch (Exception e) {
-                    logger.debug("Unable to add event to queue. " + e.getMessage(), e);
-                }
-            }
-        }
-    }
-
-    private void processEvent(Event event) {
-        if (event instanceof SessionEvent) {
-            switch (event.getType()) {
-                case SessionStatusEvent.TYPE:
-                    SessionStatusEvent statusEvent = (SessionStatusEvent) event;
-                    if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
-                        LocalFlowContext flowContext = flowContext((SessionEvent) event);
-                        if (flowContext != null) {
-                            if (flowContext.eventsQueue.isEmpty()
-                                    && flowContext.state() != FlowContext.STATE_REQ_SENT) {
-                                close(statusEvent);
-                            } else {
-                                addToQueue(flowContext, event);
-                            }
-                        }
-                    }
-                    break;
-                case HttpSessionPayloadEvent.TYPE: {
-                    HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event;
-                    LocalFlowContext flowContext = flowContext(payloadEvent);
-                    if (flowContext != null) {
-                        if (flowContext.state() >= FlowContext.STATE_CONNECTING
-                                && flowContext.state() < FlowContext.STATE_DISCONNECTING) {
-                            if (flowContext.eventsQueue.isEmpty()
-                                    && (flowContext.state() == FlowContext.STATE_CONNECTED
-                                    || flowContext.state() == FlowContext.STATE_ERROR
-                                    || flowContext.state() == FlowContext.STATE_RESP_RECEIVED)) {
-                                send(flowContext, payloadEvent);
-                            } else {
-                                addToQueue(flowContext, event);
-                            }
-                        }
-                    } else {
-                        try {
-                            SessionInfo session = payloadEvent.getSessionInfo();
-                            flowContext = (LocalFlowContext) register(session);
-                            addToQueue(flowContext, event);
-                            emitter.connect(session, this, index);
-                        } catch (Exception e) {
-                            logger.error(e.getMessage(), e);
-                        }
-                    }
-
-                    break;
-                }
-            }
-
-        } else if (event.getType() == DataLoopEnd.TYPE) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("DataLoopEnd received.");
-            }
-
-            waitCloseAllConnections();
-            filterChain.reset();
-        } else if (event.getType() == DataEnd.TYPE) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("DataEnd received. Deactivation.");
-            }
-
-            working = false;
-        }
-
-    }
-
-    @Override
-    public void run() {
-        synchronized (lock) {
-            working = true;
-            while (working) {
-                try {
-                    try {
-                        lock.wait(eventsQueueWaitTime);
-                    } catch (InterruptedException ignore) {
-                    }
-
-                    Event event;
-                    while ((event = eventsQueue.poll()) != null) {
-                        processEvent(event);
-                    }
-                } catch (Exception e) {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug(e.getMessage(), e);
-                    }
-                }
-            }
-        }
-    }
-
-    protected static class LocalFlowContext extends FlowContext {
-
-        private final Queue<Event> eventsQueue;
-
-        private LocalFlowContext(SessionInfo session, HttpScopes scopes) {
-            super(session);
-            setParam(HttpFlowConst.PARAM_HTTP_CONTEXT, new HttpFlowContext(this, scopes));
-            eventsQueue = new LinkedList<>();
-        }
-
-    }
-}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java	Wed May 29 15:03:59 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,51 +0,0 @@
-package com.passus.st.client.http;
-
-import com.passus.net.http.HttpMessageHelper;
-import com.passus.net.http.HttpRequest;
-import com.passus.net.http.HttpResponse;
-import com.passus.st.client.SessionPayloadEvent;
-import com.passus.st.emitter.SessionInfo;
-
-import static com.passus.st.Protocols.HTTP;
-
-/**
- * @author Mirosław Hawrot
- */
-@Deprecated
-public class HttpSessionPayloadEvent extends SessionPayloadEvent<HttpRequest, HttpResponse> {
-
-    public static final int TYPE = 12;
-
-    public HttpSessionPayloadEvent(SessionInfo sessionInfo, HttpRequest req, HttpResponse resp, String sourceName) {
-        super(sessionInfo, req, resp, HTTP, sourceName);
-    }
-
-    @Override
-    public int getType() {
-        return TYPE;
-    }
-
-    @Override
-    public String toString() {
-        String req = null;
-        String resp = null;
-        HttpMessageHelper helper = HttpMessageHelper.get();
-
-        if (getRequest() != null) {
-            req = helper.firstLineToString(getRequest());
-        }
-
-        if (getResponse() != null) {
-            resp = helper.firstLineToString(getResponse());
-        }
-
-        StringBuilder sb = new StringBuilder();
-        sb.append("HttpSessionPayloadEvent{")
-                .append("timestamp=").append(getTimestamp()).append(", ")
-                .append("session=").append(getSessionInfo()).append(", ")
-                .append("request=").append(req).append(", ")
-                .append("response=").append(resp).append("}");
-        return sb.toString();
-    }
-
-}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpStresserWorker.java	Wed May 29 15:03:59 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,74 +0,0 @@
-package com.passus.st.client.http;
-
-import com.passus.net.http.HttpHeaders;
-import com.passus.net.http.HttpRequest;
-import com.passus.st.client.DataEvents;
-import com.passus.st.client.Event;
-import com.passus.st.client.SessionPayloadEvent;
-import com.passus.st.emitter.Emitter;
-import com.passus.st.emitter.SessionInfo;
-
-/**
- * @author Mirosław Hawrot
- */
-@Deprecated
-public class HttpStresserWorker extends HttpClientWorker {
-
-    private int maxRequests = -1;
-
-    private int sentRequests = 0;
-
-    private boolean working = true;
-
-    public HttpStresserWorker(Emitter emitter, String name, int index) {
-        super(emitter, name, index);
-    }
-
-    @Override
-    public boolean isWorking() {
-        return working;
-    }
-
-    @Override
-    public int activeConnections() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public void close() {
-
-    }
-
-    private void send(HttpRequest req) {
-        req.getHeaders().delete(HttpHeaders.KEEP_ALIVE);
-
-    }
-
-    @Override
-    public void close(SessionInfo session) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    private void waitAndClose() {
-        working = false;
-    }
-
-    @Override
-    public void handle(Event event) {
-        if (event.getType() == DataEvents.DataLoopEnd.TYPE) {
-            waitAndClose();
-        } else if (event.getType() != SessionPayloadEvent.TYPE) {
-            return;
-        }
-
-        SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event;
-        HttpRequest req = (HttpRequest) payloadEvent.getRequest();
-        send(req);
-    }
-
-    @Override
-    public void run() {
-
-    }
-
-}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java	Wed May 29 15:03:59 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,303 +0,0 @@
-package com.passus.st.client.http;
-
-import com.passus.commons.Assert;
-import com.passus.commons.annotations.Plugin;
-import com.passus.st.client.DataEvents.DataEnd;
-import com.passus.st.client.DataEvents.DataLoopEnd;
-import com.passus.st.client.Event;
-import com.passus.st.client.FlowContext;
-import com.passus.st.client.SessionEvent;
-import com.passus.st.client.SessionStatusEvent;
-import com.passus.st.emitter.Emitter;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.plugin.PluginConstants;
-
-import java.io.IOException;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import static com.passus.st.client.FlowContext.contextStateToString;
-
-/**
- * @author Mirosław Hawrot
- */
-@Deprecated
-@Plugin(name = HttpSynchClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER)
-public class HttpSynchClientWorker extends HttpFlowBasedClientWorker {
-
-    public static final String TYPE = "synch";
-
-    private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>();
-
-    private long eventsQueueWaitTime = 100;
-
-    /**
-     * Context dla ktorego wykonywana jest operacja.
-     */
-    private FlowContext currFlowContext;
-
-    private boolean loopEnd = false;
-
-    private int loop = 0;
-
-    public HttpSynchClientWorker(Emitter emitter, String name, int index) {
-        super(emitter, name, index);
-    }
-
-    @Override
-    public boolean isWorking() {
-        return working;
-    }
-
-    public long getEventsQueueWaitTime() {
-        return eventsQueueWaitTime;
-    }
-
-    public void setEventsQueueWaitTime(long eventsQueueWaitTime) {
-        Assert.greaterThanZero(eventsQueueWaitTime, "eventsQueueWaitTime");
-        this.eventsQueueWaitTime = eventsQueueWaitTime;
-    }
-
-    @Override
-    public void sessionInvalidated(SessionInfo session) throws Exception {
-        synchronized (lock) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Session {} invalidated.", session);
-            }
-
-            FlowContext flowContext = flowContext(session);
-            if (flowContext != null) {
-                changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING);
-            }
-
-            addBlockedSession(session);
-            lock.notifyAll();
-        }
-    }
-
-    @Override
-    protected void flowStateChanged(FlowContext context, int oldState) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("flowStateChanged {},{}", context == currFlowContext, contextStateToString(context.state()));
-        }
-
-        if (context == currFlowContext) {
-            if (context.state() == FlowContext.STATE_CONNECTED
-                    || context.state() == FlowContext.STATE_RESP_RECEIVED
-                    || context.state() == FlowContext.STATE_ERROR
-                    || context.state() == FlowContext.STATE_DISCONNECTED) {
-                currFlowContext = null;
-            }
-        }
-    }
-
-    @Override
-    public void handle(Event event) {
-        Event newEvent = eventInstanceForWorker(event);
-        synchronized (lock) {
-            try {
-                eventsQueue.put(newEvent);
-            } catch (Exception e) {
-                logger.debug("Unable to add event to queue. " + e.getMessage(), e);
-            }
-
-            lock.notifyAll();
-        }
-    }
-
-    @Override
-    protected void closeAllConnections() {
-        synchronized (lock) {
-            boolean wait;
-            do {
-                wait = false;
-                for (FlowContext flowContext : sessions.values()) {
-                    if (flowContext.state() == FlowContext.STATE_REQ_SENT) {
-                        wait = true;
-                        break;
-                    }
-                }
-
-                if (wait) {
-                    try {
-                        lock.wait(100);
-                    } catch (Exception e) {
-                    }
-                }
-            } while (wait);
-
-            super.closeAllConnections();
-            while (!sessions.isEmpty()) {
-                try {
-                    lock.wait(100);
-                } catch (Exception e) {
-                }
-            }
-        }
-    }
-
-    /**
-     * Returns true if next event should be processed immediately.
-     *
-     * @return boolean
-     */
-    private boolean pollNext() {
-        if (currFlowContext != null) {
-            return false;
-        }
-
-        Event event = eventsQueue.poll();
-        if (event != null) {
-            sleep(event);
-            if (logger.isTraceEnabled()) {
-                logger.trace("Event processing: {}", event);
-            }
-
-            if (event instanceof SessionEvent) {
-                SessionEvent sessEvent = (SessionEvent) event;
-                if (isBlockedSession(sessEvent.getSessionInfo())) {
-                    return true;
-                }
-
-                if (event.getType() == SessionStatusEvent.TYPE) {
-                    SessionStatusEvent statusEvent = (SessionStatusEvent) sessEvent;
-                    if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
-                        try {
-                            currFlowContext = register(statusEvent);
-                            if (currFlowContext != null) {
-                                currFlowContext.loop(loop);
-                                emitter.connect(statusEvent.getSessionInfo(), this, index);
-                            }
-                        } catch (Exception e) {
-                            logger.error(e.getMessage(), e);
-                        }
-
-                        return (currFlowContext == null);
-                    } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
-                        currFlowContext = flowContext((SessionEvent) event);
-                        if (currFlowContext != null) {
-                            if (currFlowContext.state() != FlowContext.STATE_REQ_SENT) {
-                                close(statusEvent);
-                            }
-                        }
-                    }
-
-                    return true;
-                } else if (event.getType() == HttpSessionPayloadEvent.TYPE) {
-                    FlowContext flowContext = flowContext(sessEvent);
-                    if (flowContext != null) {
-                        switch (flowContext.state()) {
-                            case FlowContext.STATE_CONNECTED:
-                            case FlowContext.STATE_RESP_RECEIVED:
-                            case FlowContext.STATE_ERROR:
-                                currFlowContext = flowContext;
-                                if (send(flowContext, (HttpSessionPayloadEvent) event)) {
-                                    return false;
-                                } else {
-                                    currFlowContext = null;
-                                    return true;
-                                }
-                            case FlowContext.STATE_DISCONNECTING:
-                            case FlowContext.STATE_DISCONNECTED:
-                                if (connectPartialSession) {
-                                    currFlowContext = register(sessEvent);
-                                    if (currFlowContext != null) {
-                                        try {
-                                            currFlowContext.loop(loop);
-                                            emitter.connect(sessEvent.getSessionInfo(), this, index);
-                                        } catch (IOException e) {
-                                            logger.error(e.getMessage(), e);
-                                            currFlowContext = null;
-                                        }
-                                    }
-
-                                    return false;
-                                } else {
-                                    return true;
-                                }
-                            default:
-                                return false;
-                        }
-                    } else if (connectPartialSession) {
-                        currFlowContext = register(sessEvent);
-                        if (currFlowContext != null) {
-                            try {
-                                currFlowContext.loop(loop);
-                                emitter.connect(sessEvent.getSessionInfo(), this, index);
-                                eventsQueue.addFirst(sessEvent);
-                            } catch (IOException e) {
-                                logger.error(e.getMessage(), e);
-                                currFlowContext = null;
-                            }
-
-                            return false;
-                        } else {
-                            return true;
-                        }
-                    }
-
-                    return true;
-                } else {
-                    return true;
-                }
-            } else if (event.getType() == DataLoopEnd.TYPE) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("DataLoopEnd received.");
-                }
-
-                loopEnd = true;
-                closeAllConnections();
-                filterChain.reset();
-                loop = currFlowContext.loop() + 1;
-                loopEnd = false;
-                return true;
-            } else if (event.getType() == DataEnd.TYPE) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("DataEnd received. Deactivation.");
-                }
-
-                working = false;
-            }
-        }
-
-        return false;
-    }
-
-    @Override
-    public void close() {
-        synchronized (lock) {
-            eventsQueue.clear();
-            super.close();
-            working = false;
-            lock.notifyAll();
-        }
-    }
-
-    @Override
-    public void run() {
-        synchronized (lock) {
-            working = true;
-            while (working) {
-                try {
-                    try {
-                        lock.wait(eventsQueueWaitTime);
-                    } catch (InterruptedException ignore) {
-                    }
-
-                    boolean nextPoll;
-                    do {
-                        if (loopEnd || !working) {
-                            break;
-                        }
-
-                        nextPoll = pollNext();
-                    } while (nextPoll);
-                } catch (Exception e) {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug(e.getMessage(), e);
-                    }
-                }
-            }
-        }
-    }
-
-}
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java	Wed May 29 15:03:59 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java	Thu May 30 10:26:34 2019 +0200
@@ -2,25 +2,15 @@
 
 import com.passus.commons.Assert;
 import com.passus.data.*;
-import com.passus.net.http.HttpCachedHeadersImpl;
-import com.passus.net.http.HttpConsts;
-import com.passus.net.http.HttpHeaders;
-import com.passus.net.http.HttpHeadersDecoder;
-import com.passus.net.http.HttpHeadersImpl;
-import com.passus.net.http.HttpMessage;
-import com.passus.net.http.HttpMethod;
-import com.passus.net.http.HttpRequest;
-import com.passus.net.http.HttpResponse;
-import com.passus.net.http.HttpStatus;
+import com.passus.net.http.*;
+import com.passus.st.client.SessionPayloadEvent;
 import com.passus.st.client.http.HttpReqResp;
-import com.passus.st.client.http.HttpSessionPayloadEvent;
-import static com.passus.st.reader.nc.NcHttpDataUtils.CUSTOM_HEADER_CODE;
-import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_REQUEST;
-import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_RESPONSE;
+
 import java.io.IOException;
 
+import static com.passus.st.reader.nc.NcHttpDataUtils.*;
+
 /**
- *
  * @author Mirosław Hawrot
  */
 public class HttpSessionPayloadEventDataReader {
@@ -113,7 +103,7 @@
 
         if (headerSize > 0) {
             int startIndex = buffer.startIndex();
-            for (;;) {
+            for (; ; ) {
                 byte headerCode = buffer.read();
                 ByteString headerName;
                 if (headerCode == CUSTOM_HEADER_CODE) {
@@ -226,7 +216,7 @@
         return new HttpReqResp(req, resp);
     }
 
-    public HttpSessionPayloadEvent read(NcDataBlockReader reader) throws IOException {
+    public SessionPayloadEvent read(NcDataBlockReader reader) throws IOException {
         return null;
     }
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/client/AsynchFlowWorkerTest.java	Thu May 30 10:26:34 2019 +0200
@@ -0,0 +1,76 @@
+package com.passus.st.client;
+
+import com.passus.commons.service.ServiceUtils;
+import com.passus.st.AbstractWireMockTest;
+import com.passus.st.emitter.RuleBasedSessionMapper;
+import com.passus.st.emitter.nio.NioEmitter;
+import com.passus.st.utils.EventUtils;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.Properties;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.*;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class AsynchFlowWorkerTest extends AbstractWireMockTest {
+
+    private NioEmitter prepareEmitter(String mapperRule) throws Exception {
+        RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper();
+        sessionMapper.addRule(mapperRule);
+
+        NioEmitter emitter = new NioEmitter();
+        emitter.setSessionMapper(sessionMapper);
+        return emitter;
+    }
+
+    @BeforeMethod
+    public void beforeMethod() {
+        String content = "test";
+        stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html"))
+                .willReturn(aResponse()
+                        .withHeader("Content-Type", "text/plain")
+                        .withHeader("Content-Length", "" + content.length())
+                        .withBody(content)));
+    }
+
+    @Test(enabled = false)
+    public void testHandle() throws Exception {
+        Properties props = new Properties();
+        props.put("allowPartialSession", "true");
+        props.put("ports", "4214");
+        List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props);
+        assertEquals(4, events.size());
+
+        NioEmitter emitter = prepareEmitter("10.87.110.40:4214 ->  " + HOST + ":" + port());
+        emitter.start();
+
+        TestHttpClientListener listener = new TestHttpClientListener();
+
+        AsynchFlowWorker worker = new AsynchFlowWorker(emitter, "test", 0);
+        try {
+            worker.setListener(listener);
+            worker.start();
+
+            SessionEvent sessionEvent = (SessionEvent) events.get(0);
+            SessionStatusEvent statusEvent = new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED);
+            statusEvent.setTimestamp(sessionEvent.getTimestamp());
+            worker.handle(statusEvent);
+
+            events.forEach(worker::handle);
+
+            worker.join();
+            assertTrue(listener.size() > 0);
+            assertTrue(listener.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent);
+            TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listener.get(0);
+            String responseStr = event.getResponse().toString();
+            assertTrue(responseStr.startsWith("HTTP/1.1 200 OK"));
+            assertTrue(responseStr.endsWith("test"));
+        } finally {
+            ServiceUtils.stopQuietly(emitter);
+        }
+    }
+
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java	Thu May 30 10:26:34 2019 +0200
@@ -0,0 +1,72 @@
+package com.passus.st.client;
+
+import com.passus.commons.service.ServiceUtils;
+import com.passus.st.AbstractWireMockTest;
+import com.passus.st.emitter.RuleBasedSessionMapper;
+import com.passus.st.emitter.nio.NioEmitter;
+import com.passus.st.utils.EventUtils;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.Properties;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.*;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class ParallelFlowWorkerTest extends AbstractWireMockTest {
+
+    private NioEmitter prepareEmitter(String mapperRule) throws Exception {
+        RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper();
+        sessionMapper.addRule(mapperRule);
+
+        NioEmitter emitter = new NioEmitter();
+        emitter.setSessionMapper(sessionMapper);
+        return emitter;
+    }
+
+    @BeforeMethod
+    public void beforeMethod() {
+        String content = "test";
+        stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html"))
+                .willReturn(aResponse()
+                        .withHeader("Content-Type", "text/plain")
+                        .withHeader("Content-Length", "" + content.length())
+                        .withBody(content)));
+    }
+
+    @Test
+    public void testHandle() throws Exception {
+        Properties props = new Properties();
+        props.put("allowPartialSession", "true");
+        props.put("ports", "4214");
+        List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props);
+        assertEquals(4, events.size());
+
+        NioEmitter emitter = prepareEmitter("10.87.110.40:4214 ->  " + HOST + ":" + port());
+        emitter.start();
+
+        TestHttpClientListener listner = new TestHttpClientListener();
+
+        ParallelFlowWorker worker = new ParallelFlowWorker(emitter, "test", 0);
+        try {
+            worker.setListener(listner);
+            worker.start();
+
+            events.forEach(worker::handle);
+
+            worker.join();
+            assertTrue(listner.size() > 0);
+            assertTrue(listner.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent);
+            TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listner.get(0);
+            String responseStr = event.getResponse().toString();
+            assertTrue(responseStr.startsWith("HTTP/1.1 200 OK"));
+            assertTrue(responseStr.endsWith("test"));
+        } finally {
+            ServiceUtils.stopQuietly(emitter);
+        }
+
+    }
+
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Thu May 30 10:26:34 2019 +0200
@@ -0,0 +1,241 @@
+package com.passus.st.client;
+
+import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
+import com.passus.data.ByteBuff;
+import com.passus.data.HeapByteBuff;
+import com.passus.net.SocketAddress;
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpResponse;
+import com.passus.net.http.HttpResponseEncoder;
+import com.passus.st.emitter.*;
+import com.passus.st.metric.MetricsContainer;
+import com.passus.st.utils.EventUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Queue;
+
+import static com.passus.st.utils.Assert.assertHttpClientEvents;
+import static org.testng.AssertJUnit.assertEquals;
+
+public class SynchFlowWorkerTest {
+
+    private final TestHttpClientListener listener = new TestHttpClientListener();
+
+    private static class LocalEmitter implements Emitter {
+
+        private SessionMapper sessionMapper;
+
+        private boolean started = false;
+
+        @Override
+        public void setSessionMapper(SessionMapper sessionMapper) {
+            this.sessionMapper = sessionMapper;
+        }
+
+        @Override
+        public SessionMapper getSessionMapper() {
+            return sessionMapper;
+        }
+
+        @Override
+        public void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException {
+            LocalChannelContext channelContext = new LocalChannelContext(this, handler, null, session);
+            try {
+                handler.channelRegistered(channelContext);
+                handler.channelActive(channelContext);
+            } catch (Exception ex) {
+                ex.printStackTrace();
+            }
+        }
+
+        protected void flush(LocalChannelContext channelContext) {
+            SessionInfo sessionInfo = channelContext.getSessionInfo();
+            SynchFlowWorker clientWorker = (SynchFlowWorker) channelContext.handler;
+            FlowContext flowContext = clientWorker.flowContext(sessionInfo);
+            SessionPayloadEvent event = flowContext.sentEvent();
+
+            HttpRequest request = (HttpRequest) event.getRequest();
+            HttpResponse response = (HttpResponse) event.getResponse();
+            ByteBuff buff = new HeapByteBuff();
+            HttpResponseEncoder.INSTANCE.encode(response, buff);
+            try {
+                clientWorker.dataReceived(channelContext, buff);
+            } catch (Exception ex) {
+                ex.printStackTrace();
+            }
+        }
+
+        protected void close(LocalChannelContext channelContext) {
+            try {
+                channelContext.handler.channelInactive(channelContext);
+                channelContext.handler.channelUnregistered(channelContext);
+            } catch (Exception ex) {
+                ex.printStackTrace();
+            }
+        }
+
+        @Override
+        public boolean isStarted() {
+            return started;
+        }
+
+        @Override
+        public void start() {
+            if (sessionMapper == null) {
+                sessionMapper = new PassThroughSessionMapper();
+            }
+
+            started = true;
+        }
+
+        @Override
+        public void stop() {
+            started = false;
+        }
+
+        @Override
+        public boolean isCollectMetrics() {
+            return false;
+        }
+
+        @Override
+        public void setCollectMetrics(boolean collectMetrics) {
+        }
+
+        @Override
+        public void writeMetrics(MetricsContainer container) {
+        }
+
+        @Override
+        public void configure(Configuration config, ConfigurationContext context) {
+        }
+    }
+
+    private static class LocalChannelContext implements ChannelContext {
+
+        private final LocalEmitter emitter;
+
+        private final EmitterHandler handler;
+
+        private final SessionInfo sessionInfo;
+
+        private final Queue<ByteBuffer> dataQueue;
+
+        private SocketAddress localAddress;
+
+        private SocketAddress remoteAddress;
+
+        public LocalChannelContext(LocalEmitter emitter, EmitterHandler handler, SocketAddress remoteAddress, SessionInfo sessionInfo) {
+            this.emitter = emitter;
+            this.handler = handler;
+            this.remoteAddress = remoteAddress;
+            this.sessionInfo = sessionInfo;
+            this.dataQueue = new LinkedList<>();
+        }
+
+        @Override
+        public boolean isConnected() {
+            throw new UnsupportedOperationException("Not supported yet.");
+        }
+
+        @Override
+        public boolean isConnectionPending() {
+            throw new UnsupportedOperationException("Not supported yet.");
+        }
+
+        private void addToQeueu(ByteBuffer buffer) throws IOException {
+            dataQueue.add(buffer);
+        }
+
+        @Override
+        public void write(byte[] data, int offset, int length) throws IOException {
+            addToQeueu(ByteBuffer.wrap(data, offset, length));
+        }
+
+        @Override
+        public void write(ByteBuff data) throws IOException {
+            addToQeueu(data.toNioByteBuffer());
+        }
+
+        @Override
+        public void flush() throws IOException {
+            emitter.flush(this);
+        }
+
+        @Override
+        public void close() throws IOException {
+            emitter.close(this);
+        }
+
+        @Override
+        public SocketAddress getLocalAddress() {
+            return localAddress;
+        }
+
+        @Override
+        public SocketAddress getRemoteAddress() {
+            return remoteAddress;
+        }
+
+        @Override
+        public SessionInfo getSessionInfo() {
+            return sessionInfo;
+        }
+
+    }
+
+    private SynchFlowWorker createWorker() {
+        LocalEmitter emitter = new LocalEmitter();
+        SynchFlowWorker worker = new SynchFlowWorker(emitter, "test", 0);
+        worker.setListener(listener);
+        return worker;
+    }
+
+    private List<Event> readEvents(String pcapFile) throws Exception {
+        Properties props = new Properties();
+        props.put("allowPartialSession", "true");
+        props.put("ports", "4214");
+        return EventUtils.readEvents(pcapFile, props);
+    }
+
+    @AfterMethod
+    public void afterMethod() {
+        listener.clear();
+    }
+
+    @Test
+    public void testHandle_HTTP_SimpleRequestResponse() throws Exception {
+        List<Event> events = readEvents("pcap/http/http_req_resp.pcap");
+        assertEquals(4, events.size());
+
+        SynchFlowWorker worker = createWorker();
+        worker.start();
+        SessionEvent sessionEvent = (SessionEvent) events.get(0);
+        worker.handle(new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED));
+        events.forEach(worker::handle);
+        worker.join();
+
+        assertHttpClientEvents(events, listener.events());
+    }
+
+    @Test
+    public void testHandle_HTTP_SimpleRequestResponse_ConnectPartialSession() throws Exception {
+        List<Event> events = readEvents("pcap/http/http_req_resp.pcap");
+        assertEquals(4, events.size());
+
+        SynchFlowWorker worker = createWorker();
+        worker.setConnectPartialSession(true);
+        worker.start();
+        events.forEach(worker::handle);
+        worker.join();
+
+        assertHttpClientEvents(events, listener.events());
+    }
+}
\ No newline at end of file
--- a/stress-tester/src/test/java/com/passus/st/client/http/HttpAsynchClientWorkerTest.java	Wed May 29 15:03:59 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,86 +0,0 @@
-package com.passus.st.client.http;
-
-import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
-import static com.github.tomakehurst.wiremock.client.WireMock.post;
-import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
-import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
-import com.passus.commons.service.ServiceUtils;
-import com.passus.st.AbstractWireMockTest;
-import com.passus.st.client.Event;
-import com.passus.st.client.SessionEvent;
-import com.passus.st.client.SessionStatusEvent;
-import com.passus.st.client.TestHttpClientListener;
-import com.passus.st.emitter.RuleBasedSessionMapper;
-import com.passus.st.emitter.nio.NioEmitter;
-import com.passus.st.utils.EventUtils;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertTrue;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- *
- * @author Mirosław Hawrot
- */
-public class HttpAsynchClientWorkerTest extends AbstractWireMockTest {
-
-    private NioEmitter prepareEmitter(String mapperRule) throws Exception {
-        RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper();
-        sessionMapper.addRule(mapperRule);
-
-        NioEmitter emitter = new NioEmitter();
-        emitter.setSessionMapper(sessionMapper);
-        return emitter;
-    }
-
-    @BeforeMethod
-    public void beforeMethod() {
-        String content = "test";
-        stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html"))
-                .willReturn(aResponse()
-                        .withHeader("Content-Type", "text/plain")
-                        .withHeader("Content-Length", "" + content.length())
-                        .withBody(content)));
-    }
-
-    @Test(enabled = false)
-    public void testHandle() throws Exception {
-        Properties props = new Properties();
-        props.put("allowPartialSession", "true");
-        props.put("ports", "4214");
-        List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props);
-        assertEquals(4, events.size());
-
-        NioEmitter emitter = prepareEmitter("10.87.110.40:4214 ->  " + HOST + ":" + port());
-        emitter.start();
-
-        TestHttpClientListener listner = new TestHttpClientListener();
-
-        HttpAsynchClientWorker worker = new HttpAsynchClientWorker(emitter, "test", 0);
-        try {
-            worker.setListeners(Arrays.asList(listner));
-            worker.start();
-
-            SessionEvent sessionEvent = (SessionEvent) events.get(0);
-            SessionStatusEvent statusEvent = new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED);
-            statusEvent.setTimestamp(sessionEvent.getTimestamp());
-            worker.handle(statusEvent);
-
-            events.forEach(worker::handle);
-
-            worker.join();
-            assertTrue(listner.size() > 0);
-            assertTrue(listner.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent);
-            TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listner.get(0);
-            String responseStr = event.getResponse().toString();
-            assertTrue(responseStr.startsWith("HTTP/1.1 200 OK"));
-            assertTrue(responseStr.endsWith("test"));
-        } finally {
-            ServiceUtils.stopQuietly(emitter);
-        }
-    }
-
-}
--- a/stress-tester/src/test/java/com/passus/st/client/http/HttpParallelClientWorkerTest.java	Wed May 29 15:03:59 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,80 +0,0 @@
-package com.passus.st.client.http;
-
-import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
-import static com.github.tomakehurst.wiremock.client.WireMock.post;
-import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
-import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
-import com.passus.commons.service.ServiceUtils;
-import com.passus.st.AbstractWireMockTest;
-import com.passus.st.client.Event;
-import com.passus.st.client.TestHttpClientListener;
-import com.passus.st.emitter.RuleBasedSessionMapper;
-import com.passus.st.emitter.nio.NioEmitter;
-import com.passus.st.utils.EventUtils;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertTrue;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- *
- * @author Mirosław Hawrot
- */
-public class HttpParallelClientWorkerTest extends AbstractWireMockTest {
-
-    private NioEmitter prepareEmitter(String mapperRule) throws Exception {
-        RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper();
-        sessionMapper.addRule(mapperRule);
-
-        NioEmitter emitter = new NioEmitter();
-        emitter.setSessionMapper(sessionMapper);
-        return emitter;
-    }
-
-    @BeforeMethod
-    public void beforeMethod() {
-        String content = "test";
-        stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html"))
-                .willReturn(aResponse()
-                        .withHeader("Content-Type", "text/plain")
-                        .withHeader("Content-Length", "" + content.length())
-                        .withBody(content)));
-    }
-
-    @Test
-    public void testHandle() throws Exception {
-        Properties props = new Properties();
-        props.put("allowPartialSession", "true");
-        props.put("ports", "4214");
-        List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props);
-        assertEquals(4, events.size());
-
-        NioEmitter emitter = prepareEmitter("10.87.110.40:4214 ->  " + HOST + ":" + port());
-        emitter.start();
-
-        TestHttpClientListener listner = new TestHttpClientListener();
-
-        HttpParallelClientWorker worker = new HttpParallelClientWorker(emitter, "test", 0);
-        try {
-            worker.setListeners(Arrays.asList(listner));
-            worker.start();
-
-            events.forEach(worker::handle);
-
-            worker.join();
-            assertTrue(listner.size() > 0);
-            assertTrue(listner.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent);
-            TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listner.get(0);
-            String responseStr = event.getResponse().toString();
-            assertTrue(responseStr.startsWith("HTTP/1.1 200 OK"));
-            assertTrue(responseStr.endsWith("test"));
-        } finally {
-            ServiceUtils.stopQuietly(emitter);
-        }
-
-    }
-
-}
--- a/stress-tester/src/test/java/com/passus/st/client/http/HttpSynchClientWorkerTest.java	Wed May 29 15:03:59 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,249 +0,0 @@
-package com.passus.st.client.http;
-
-import com.passus.config.Configuration;
-import com.passus.config.ConfigurationContext;
-import com.passus.data.ByteBuff;
-import com.passus.data.HeapByteBuff;
-import com.passus.net.SocketAddress;
-import com.passus.net.http.HttpRequest;
-import com.passus.net.http.HttpResponse;
-import com.passus.net.http.HttpResponseEncoder;
-import com.passus.st.client.*;
-import com.passus.st.emitter.ChannelContext;
-import com.passus.st.emitter.Emitter;
-import com.passus.st.emitter.EmitterHandler;
-import com.passus.st.emitter.PassThroughSessionMapper;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.emitter.SessionMapper;
-import com.passus.st.metric.MetricsContainer;
-import com.passus.st.utils.EventUtils;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import java.util.Queue;
-import org.testng.annotations.Test;
-import static com.passus.st.utils.Assert.*;
-import org.testng.annotations.AfterMethod;
-
-/**
- *
- * @author mikolaj.podbielski
- * @author miroslaw.hawrot
- */
-public class HttpSynchClientWorkerTest {
-
-    private final TestHttpClientListener listner = new TestHttpClientListener();
-
-    private static class LocalEmitter implements Emitter {
-
-        private SessionMapper sessionMapper;
-
-        private boolean started = false;
-
-        @Override
-        public void setSessionMapper(SessionMapper sessionMapper) {
-            this.sessionMapper = sessionMapper;
-        }
-
-        @Override
-        public SessionMapper getSessionMapper() {
-            return sessionMapper;
-        }
-
-        @Override
-        public void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException {
-            LocalChannelContext channelContext = new LocalChannelContext(this, handler, null, session);
-            try {
-                handler.channelRegistered(channelContext);
-                handler.channelActive(channelContext);
-            } catch (Exception ex) {
-                ex.printStackTrace();
-            }
-        }
-
-        protected void flush(LocalChannelContext channelContext) {
-            SessionInfo sessionInfo = channelContext.getSessionInfo();
-            HttpFlowBasedClientWorker clientWorker = (HttpFlowBasedClientWorker) channelContext.handler;
-            FlowContext flowContext = clientWorker.flowContext(sessionInfo);
-            HttpSessionPayloadEvent event = (HttpSessionPayloadEvent) flowContext.sentEvent();
-
-            HttpRequest request = event.getRequest();
-            HttpResponse response = event.getResponse();
-            ByteBuff buff = new HeapByteBuff();
-            HttpResponseEncoder.INSTANCE.encode(response, buff);
-            try {
-                clientWorker.dataReceived(channelContext, buff);
-            } catch (Exception ex) {
-                ex.printStackTrace();
-            }
-        }
-
-        protected void close(LocalChannelContext channelContext) {
-            try {
-                channelContext.handler.channelInactive(channelContext);
-                channelContext.handler.channelUnregistered(channelContext);
-            } catch (Exception ex) {
-                ex.printStackTrace();
-            }
-        }
-
-        @Override
-        public boolean isStarted() {
-            return started;
-        }
-
-        @Override
-        public void start() {
-            if(sessionMapper == null) {
-                sessionMapper = new PassThroughSessionMapper();
-            }
-
-            started = true;
-        }
-
-        @Override
-        public void stop() {
-            started = false;
-        }
-
-        @Override
-        public boolean isCollectMetrics() {
-            return false;
-        }
-
-        @Override
-        public void setCollectMetrics(boolean collectMetrics) {
-        }
-
-        @Override
-        public void writeMetrics(MetricsContainer container) {
-        }
-
-        @Override
-        public void configure(Configuration config, ConfigurationContext context) {
-        }
-    }
-
-    private static class LocalChannelContext implements ChannelContext {
-
-        private final LocalEmitter emitter;
-
-        private final EmitterHandler handler;
-
-        private final SessionInfo sessionInfo;
-
-        private final Queue<ByteBuffer> dataQueue;
-
-        private SocketAddress localAddress;
-
-        private SocketAddress remoteAddress;
-
-        public LocalChannelContext(LocalEmitter emitter, EmitterHandler handler, SocketAddress remoteAddress, SessionInfo sessionInfo) {
-            this.emitter = emitter;
-            this.handler = handler;
-            this.remoteAddress = remoteAddress;
-            this.sessionInfo = sessionInfo;
-            this.dataQueue = new LinkedList<>();
-        }
-
-        @Override
-        public boolean isConnected() {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-
-        @Override
-        public boolean isConnectionPending() {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-
-        private void addToQeueu(ByteBuffer buffer) throws IOException {
-            dataQueue.add(buffer);
-        }
-
-        @Override
-        public void write(byte[] data, int offset, int length) throws IOException {
-            addToQeueu(ByteBuffer.wrap(data, offset, length));
-        }
-
-        @Override
-        public void write(ByteBuff data) throws IOException {
-            addToQeueu(data.toNioByteBuffer());
-        }
-
-        @Override
-        public void flush() throws IOException {
-            emitter.flush(this);
-        }
-
-        @Override
-        public void close() throws IOException {
-            emitter.close(this);
-        }
-
-        @Override
-        public SocketAddress getLocalAddress() {
-            return localAddress;
-        }
-
-        @Override
-        public SocketAddress getRemoteAddress() {
-            return remoteAddress;
-        }
-
-        @Override
-        public SessionInfo getSessionInfo() {
-            return sessionInfo;
-        }
-
-    }
-
-    @AfterMethod
-    public void afterMethod() {
-        listner.clear();
-    }
-
-    private HttpSynchClientWorker createWorker() {
-        LocalEmitter emitter = new LocalEmitter();
-        HttpSynchClientWorker worker = new HttpSynchClientWorker(emitter, "test", 0);
-        worker.addListener(listner);
-        return worker;
-    }
-
-    private List<Event> readEvents(String pcapFile) throws Exception {
-        Properties props = new Properties();
-        props.put("allowPartialSession", "true");
-        props.put("ports", "4214");
-        return EventUtils.readEvents(pcapFile, props);
-    }
-
-    @Test(enabled = false)
-    public void testHandle_SimpleRequestResponse() throws Exception {
-        List<Event> events = readEvents("pcap/http/http_req_resp.pcap");
-        assertEquals(4, events.size());
-
-        HttpSynchClientWorker worker = createWorker();
-        worker.start();
-        SessionEvent sessionEvent = (SessionEvent) events.get(0);
-        worker.handle(new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED));
-        events.forEach(worker::handle);
-        worker.join();
-
-        assertHttpClientEvents(events, listner.events());
-    }
-
-    @Test(enabled = false)
-    public void testHandle_SimpleRequestResponse_ConnectPartialSession() throws Exception {
-        List<Event> events = readEvents("pcap/http/http_req_resp.pcap");
-        assertEquals(4, events.size());
-
-        HttpSynchClientWorker worker = createWorker();
-        worker.setConnectPartialSession(true);
-        worker.start();
-        events.forEach(worker::handle);
-        worker.join();
-
-        assertHttpClientEvents(events, listner.events());
-    }
-}
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java	Wed May 29 15:03:59 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java	Thu May 30 10:26:34 2019 +0200
@@ -1,26 +1,23 @@
 package com.passus.st.reader.nc;
 
-import static com.passus.data.DataSourceUtils.toByteBuff;
 import com.passus.data.HeapByteBuff;
-import com.passus.net.http.HttpConsts;
-import com.passus.net.http.HttpHeaders;
-import com.passus.net.http.HttpHeadersImpl;
-import com.passus.net.http.HttpRequest;
-import com.passus.net.http.HttpRequestBuilder;
-import com.passus.net.http.HttpResponse;
-import com.passus.net.http.HttpResponseBuilder;
+import com.passus.net.http.*;
+import com.passus.st.client.SessionPayloadEvent;
 import com.passus.st.client.http.HttpReqResp;
-import com.passus.st.client.http.HttpSessionPayloadEvent;
 import com.passus.st.emitter.SessionInfo;
-import static com.passus.st.utils.HttpMessageAssert.*;
+import org.testng.annotations.Test;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.UUID;
+
+import static com.passus.data.DataSourceUtils.toByteBuff;
+import static com.passus.st.Protocols.HTTP;
+import static com.passus.st.utils.HttpMessageAssert.assertMessages;
+import static com.passus.st.utils.HttpMessageAssert.assertMessagesContent;
 import static org.testng.AssertJUnit.*;
-import org.testng.annotations.Test;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public class HttpSessionPayloadEventDataWriterTest {
@@ -56,7 +53,7 @@
         HttpSessionPayloadEventDataWriter payloadWriter = new HttpSessionPayloadEventDataWriter();
         try (NcDataBlockWriter writer = new NcDataBlockWriter(file)) {
             writer.open();
-            payloadWriter.write(new HttpSessionPayloadEvent(session, req, resp, ""), writer);
+            payloadWriter.write(new SessionPayloadEvent(session, req, resp, HTTP, ""), writer);
         }
     }
 
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java	Wed May 29 15:03:59 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java	Thu May 30 10:26:34 2019 +0200
@@ -4,7 +4,6 @@
 import com.passus.st.client.Event;
 import com.passus.st.client.SessionPayloadEvent;
 import com.passus.st.client.SessionStatusEvent;
-import com.passus.st.client.http.HttpSessionPayloadEvent;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.reader.nc.NcDataBlock;
 import com.passus.st.reader.nc.NcSegmentBlock;
--- a/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java	Wed May 29 15:03:59 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java	Thu May 30 10:26:34 2019 +0200
@@ -1,7 +1,6 @@
 package com.passus.st.source;
 
 import com.passus.st.client.SessionPayloadEvent;
-import com.passus.st.client.http.HttpSessionPayloadEvent;
 import com.passus.commons.utils.ResourceUtils;
 import com.passus.st.client.ArrayListEventHandler;
 import com.passus.st.client.DataEvents.DataEnd;