changeset 956:b0867db5ea27

Refactorization in progress
author Devel 2
date Wed, 29 May 2019 12:32:29 +0200
parents ecc66b501b16
children 8fc6d1cae116
files stress-tester/pom.xml stress-tester/src/main/java/com/passus/st/PcapScanner.java stress-tester/src/main/java/com/passus/st/Protocols.java stress-tester/src/main/java/com/passus/st/client/AbstractClient.java stress-tester/src/main/java/com/passus/st/client/AbstractFlow.java stress-tester/src/main/java/com/passus/st/client/Client.java stress-tester/src/main/java/com/passus/st/client/ClientX.java stress-tester/src/main/java/com/passus/st/client/ClientXDataDecoder.java stress-tester/src/main/java/com/passus/st/client/ClientXDataEncoder.java stress-tester/src/main/java/com/passus/st/client/ClientXFactory.java stress-tester/src/main/java/com/passus/st/client/ClientXFactoryImpl.java stress-tester/src/main/java/com/passus/st/client/EventBusListener.java stress-tester/src/main/java/com/passus/st/client/EventHandler.java stress-tester/src/main/java/com/passus/st/client/Flow.java stress-tester/src/main/java/com/passus/st/client/FlowBased.java stress-tester/src/main/java/com/passus/st/client/FlowContext.java stress-tester/src/main/java/com/passus/st/client/FlowUtils.java stress-tester/src/main/java/com/passus/st/client/FlowWorker.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java stress-tester/src/main/java/com/passus/st/client/NullFlowWorker.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/ReqRespPair.java stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerDispatcher.java stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerFactory.java stress-tester/src/main/java/com/passus/st/client/http/HttpClientX.java stress-tester/src/main/java/com/passus/st/client/http/HttpClientXDataDecoder.java stress-tester/src/main/java/com/passus/st/client/http/HttpClientXDataEncoder.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowConst.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowDispatcher.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowParams.java stress-tester/src/main/java/com/passus/st/client/http/HttpNullClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java stress-tester/src/main/java/com/passus/st/client/http/HttpStresserWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/extractor/YamlExtractor.java stress-tester/src/main/java/com/passus/st/client/http/filter/HttpCsrfFormFilter.java stress-tester/src/main/java/com/passus/st/client/http/filter/HttpMessagePredicate.java stress-tester/src/main/java/com/passus/st/emitter/EmitterHandler.java stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioDefaultEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker2.java stress-tester/src/main/java/com/passus/st/job/TestJob.java stress-tester/src/main/java/com/passus/st/metric/MetricSource.java stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/option/DefaultValueCoderResolver.java stress-tester/src/main/java/com/passus/st/source/NcEventSource.java stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java stress-tester/src/test/java/com/passus/st/PcapScannerTest.java stress-tester/src/test/java/com/passus/st/client/TestClientHandler.java stress-tester/src/test/java/com/passus/st/utils/Assert.java stress-tester/src/test/java/com/passus/st/utils/HttpMessageAssert.java
diffstat 57 files changed, 2249 insertions(+), 255 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/pom.xml	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/pom.xml	Wed May 29 12:32:29 2019 +0200
@@ -105,6 +105,12 @@
         </dependency>
 
         <dependency>
+            <groupId>it.unimi.dsi</groupId>
+            <artifactId>fastutil</artifactId>
+            <version>8.2.2</version>
+        </dependency>
+
+        <dependency>
             <groupId>com.passus</groupId>
             <artifactId>passus-lookup</artifactId>
             <version>1.0-SNAPSHOT</version>
--- a/stress-tester/src/main/java/com/passus/st/PcapScanner.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/PcapScanner.java	Wed May 29 12:32:29 2019 +0200
@@ -6,7 +6,7 @@
 import com.passus.net.http.session.HttpSessionAnalyzer;
 import com.passus.st.client.Event;
 import com.passus.st.client.EventHandler;
-import com.passus.st.client.http.HttpSessionPayloadEvent;
+import com.passus.st.client.SessionPayloadEvent;
 import com.passus.st.source.PcapSessionEventSource;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
@@ -19,6 +19,7 @@
 import java.util.stream.Collectors;
 
 import static com.passus.st.Main.printHelp;
+import static com.passus.st.Protocols.HTTP;
 import static com.passus.st.utils.CliUtils.option;
 
 /**
@@ -74,8 +75,8 @@
 
     @Override
     public void handle(Event event) {
-        if (event instanceof HttpSessionPayloadEvent) {
-            Object extracted = extractor.extract((HttpSessionPayloadEvent) event);
+        if (event instanceof SessionPayloadEvent) {
+            Object extracted = extractor.extract((SessionPayloadEvent) event);
             if (extracted != null) {
                 values.add(extracted.toString());
 //                System.out.println(extracted.toString());
@@ -89,23 +90,26 @@
 
     public interface Extractor {
 
-        public Object extract(HttpSessionPayloadEvent event);
+        Object extract(SessionPayloadEvent event);
     }
 
     static class PostExtractor implements Extractor {
 
         @Override
-        public Object extract(HttpSessionPayloadEvent event) {
-            HttpRequest request = event.getRequest();
-            ByteString contentTypeBs = request.getHeaders().get(HttpHeaders.CONTENT_TYPE);
-            HttpMethod method = request.getMethod();
+        public Object extract(SessionPayloadEvent event) {
+            if (event.getProtocolId() == HTTP) {
+                HttpRequest request = (HttpRequest) event.getRequest();
+                ByteString contentTypeBs = request.getHeaders().get(HttpHeaders.CONTENT_TYPE);
+                HttpMethod method = request.getMethod();
 
-            if (HttpMethod.POST.equals(method) && contentTypeBs != null) {
-                String contentType = contentTypeBs.toString();
-                if (contentType.equals("application/x-www-form-urlencoded")) {
-                    return request;
+                if (HttpMethod.POST.equals(method) && contentTypeBs != null) {
+                    String contentType = contentTypeBs.toString();
+                    if (contentType.equals("application/x-www-form-urlencoded")) {
+                        return request;
+                    }
                 }
             }
+
             return null;
         }
 
@@ -177,9 +181,9 @@
         }
     }
 
-    private static Object urlExtract(HttpSessionPayloadEvent event, URLExtractor ue) {
+    private static Object urlExtract(SessionPayloadEvent event, URLExtractor ue) {
         try {
-            return ue.extract(URL.parse(event.getRequest().getUrl()));
+            return ue.extract(URL.parse(((HttpRequest) event.getRequest()).getUrl()));
         } catch (MalformedURLException ex) {
             ex.printStackTrace(System.err);
             return null;
@@ -203,7 +207,7 @@
 
         switch (type) {
             case "url":
-                return (e) -> e.getRequest().getUrl();
+                return (e) -> ((HttpRequest) e.getRequest()).getUrl();
             case "path":
                 return (e) -> urlExtract(e, URL::getPath);
             case "query":
@@ -213,9 +217,9 @@
             case "reqPost":
                 return new PostExtractor();
             case "reqHdr":
-                return (e) -> e.getRequest().getHeaders().get(arg);
+                return (e) -> ((HttpRequest) e.getRequest()).getHeaders().get(arg);
             case "respHdr":
-                return (e) -> e.getResponse().getHeaders().get(arg);
+                return (e) -> ((HttpRequest) e.getResponse()).getHeaders().get(arg);
             default:
                 throw new ParseException("Invalid extractor spec: " + spec);
         }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/Protocols.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,22 @@
+package com.passus.st;
+
+public class Protocols {
+
+    public static final int UNKNOWN = 0;
+    public static final int HTTP = 1;
+    public static final int DNS = 2;
+
+    private Protocols() {
+    }
+
+    public static String protocolToString(int protocolId) {
+        switch (protocolId) {
+            case HTTP:
+                return "HTTP";
+            case DNS:
+                return "DNS";
+            default:
+                return "unknown";
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/AbstractClient.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,84 @@
+package com.passus.st.client;
+
+import com.passus.commons.Assert;
+import com.passus.st.emitter.Emitter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public abstract class AbstractClient implements Client {
+
+    private static final boolean DEFAULT_COLLECT_METRICS = false;
+
+    private Emitter emitter;
+
+    private final List<ClientListener> listeners = new ArrayList<>();
+
+    private FlowFilterChain filterChain = new FlowFilterChain();
+
+    private boolean collectMetrics = DEFAULT_COLLECT_METRICS;
+
+    @Override
+    public Emitter getEmitter() {
+        return emitter;
+    }
+
+    @Override
+    public void setEmitter(Emitter emitter) {
+        this.emitter = emitter;
+    }
+
+    @Override
+    public void setListeners(Collection<ClientListener> listeners) {
+        Assert.notContainsNull(listeners, "listeners");
+        this.listeners.addAll(listeners);
+    }
+
+    @Override
+    public void addListener(ClientListener listener) {
+        Assert.notNull(listener, "listener");
+        listeners.add(listener);
+    }
+
+    @Override
+    public void removeListener(ClientListener listener) {
+        Assert.notNull(listener, "listener");
+        listeners.remove(listener);
+    }
+
+    @Override
+    public List<ClientListener> getListeners() {
+        return Collections.unmodifiableList(listeners);
+    }
+
+    @Override
+    public List<FlowFilter> getFilters() {
+        return filterChain.getFilters();
+    }
+
+    @Override
+    public void addFilter(FlowFilter filter) {
+        Assert.notNull(filter, "filter");
+        filterChain.addFilter(filter);
+    }
+
+    @Override
+    public void setFilters(Collection<FlowFilter> filters) {
+        Assert.notContainsNull(filters, "filters");
+        filterChain.clear();
+        filters.forEach((filter) -> filterChain.addFilter(filter));
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        this.collectMetrics = collectMetrics;
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/AbstractFlow.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,99 @@
+package com.passus.st.client;
+
+import com.passus.commons.Assert;
+import com.passus.st.emitter.Emitter;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public abstract class AbstractFlow implements Flow {
+
+    protected final Logger logger = LogManager.getLogger(getClass());
+
+    private static final boolean DEFAULT_COLLECT_METRICS = false;
+
+    protected Emitter emitter;
+
+    private final List<ClientListener> listeners = new ArrayList<>();
+
+    private FlowFilterChain filterChain = new FlowFilterChain();
+
+    private boolean collectMetrics = DEFAULT_COLLECT_METRICS;
+
+    protected ClientX client;
+
+    @Override
+    public ClientX getClient() {
+        return client;
+    }
+
+    @Override
+    public void setClient(ClientX client) {
+        this.client = client;
+    }
+
+    @Override
+    public Emitter getEmitter() {
+        return emitter;
+    }
+
+    @Override
+    public void setEmitter(Emitter emitter) {
+        this.emitter = emitter;
+    }
+
+    @Override
+    public void setListeners(Collection<ClientListener> listeners) {
+        Assert.notContainsNull(listeners, "listeners");
+        this.listeners.addAll(listeners);
+    }
+
+    @Override
+    public void addListener(ClientListener listener) {
+        Assert.notNull(listener, "listener");
+        listeners.add(listener);
+    }
+
+    @Override
+    public void removeListener(ClientListener listener) {
+        Assert.notNull(listener, "listener");
+        listeners.remove(listener);
+    }
+
+    @Override
+    public List<ClientListener> getListeners() {
+        return Collections.unmodifiableList(listeners);
+    }
+
+    @Override
+    public List<FlowFilter> getFilters() {
+        return filterChain.getFilters();
+    }
+
+    @Override
+    public void addFilter(FlowFilter filter) {
+        Assert.notNull(filter, "filter");
+        filterChain.addFilter(filter);
+    }
+
+    @Override
+    public void setFilters(Collection<FlowFilter> filters) {
+        Assert.notContainsNull(filters, "filters");
+        filterChain.clear();
+        filters.forEach((filter) -> filterChain.addFilter(filter));
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        this.collectMetrics = collectMetrics;
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/client/Client.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/Client.java	Wed May 29 12:32:29 2019 +0200
@@ -5,15 +5,32 @@
 import com.passus.st.emitter.Emitter;
 import com.passus.st.metric.MetricSource;
 
+import java.util.Collection;
+import java.util.List;
+
 /**
  * @author Mirosław Hawrot
  */
 public interface Client extends EventHandler, MetricSource, Service, Configurable {
 
-    public Emitter getEmitter();
+    Emitter getEmitter();
 
-    public void setEmitter(Emitter emitter);
+    void setEmitter(Emitter emitter);
 
-    public void join() throws InterruptedException;
+    void join() throws InterruptedException;
+
+    void setListeners(Collection<ClientListener> listeners);
+
+    void addListener(ClientListener listener);
+
+    void removeListener(ClientListener listener);
+
+    List<ClientListener> getListeners();
+
+    List<FlowFilter> getFilters();
+
+    void addFilter(FlowFilter filter);
+
+    void setFilters(Collection<FlowFilter> filters);
 
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/ClientX.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,24 @@
+package com.passus.st.client;
+
+import com.passus.config.Configurable;
+import com.passus.st.metric.MetricSource;
+
+public interface ClientX extends MetricSource, Configurable {
+
+    int getProtocolId();
+
+    void init(FlowContext flowContext);
+
+    ClientXDataDecoder getResponseDecoder(FlowContext flowContext);
+
+    ClientXDataEncoder getRequestEncoder(FlowContext flowContext);
+
+    default void onDataWriteStart(FlowContext flowContext) {
+
+    }
+
+    default void onDataWriteEnd(FlowContext flowContext) {
+
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/ClientXDataDecoder.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,19 @@
+package com.passus.st.client;
+
+import com.passus.data.ByteBuff;
+
+public interface ClientXDataDecoder<T> {
+
+    T getResult();
+
+    int state();
+
+    String getLastError();
+
+    default void clear(FlowContext flowContext) {
+
+    }
+    
+    int decode(ByteBuff buffer, FlowContext flowContext);
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/ClientXDataEncoder.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,9 @@
+package com.passus.st.client;
+
+import com.passus.data.ByteBuff;
+
+public interface ClientXDataEncoder<T> {
+
+    void encode(T request, FlowContext flowContext, ByteBuff out);
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/ClientXFactory.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,7 @@
+package com.passus.st.client;
+
+public interface ClientXFactory {
+
+    ClientX create(int protocolId);
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/ClientXFactoryImpl.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,18 @@
+package com.passus.st.client;
+
+import com.passus.st.client.http.HttpClientX;
+
+import static com.passus.st.Protocols.HTTP;
+
+public class ClientXFactoryImpl implements ClientXFactory {
+
+    @Override
+    public ClientX create(int protocolId) {
+        switch (protocolId) {
+            case HTTP:
+                return new HttpClientX();
+        }
+
+        return null;
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/client/EventBusListener.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/EventBusListener.java	Wed May 29 12:32:29 2019 +0200
@@ -6,9 +6,9 @@
  */
 public interface EventBusListener {
 
-    public void eventDropped(Event event);
+    void eventDropped(Event event);
 
-    public void eventQueued(Event event);
+    void eventQueued(Event event);
 
-    public void eventProcessed(Event event);
+    void eventProcessed(Event event);
 }
--- a/stress-tester/src/main/java/com/passus/st/client/EventHandler.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/EventHandler.java	Wed May 29 12:32:29 2019 +0200
@@ -6,6 +6,6 @@
  */
 public interface EventHandler {
 
-    public void handle(Event event);
+    void handle(Event event);
 
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/Flow.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,36 @@
+package com.passus.st.client;
+
+import com.passus.config.Configurable;
+import com.passus.st.emitter.Emitter;
+import com.passus.st.metric.MetricSource;
+
+import java.util.Collection;
+import java.util.List;
+
+public interface Flow extends EventHandler, MetricSource, Configurable {
+
+    ClientX getClient();
+
+    void setClient(ClientX client);
+
+    Emitter getEmitter();
+
+    void setEmitter(Emitter emitter);
+
+    void join() throws InterruptedException;
+
+    void setListeners(Collection<ClientListener> listeners);
+
+    void addListener(ClientListener listener);
+
+    void removeListener(ClientListener listener);
+
+    List<ClientListener> getListeners();
+
+    List<FlowFilter> getFilters();
+
+    void addFilter(FlowFilter filter);
+
+    void setFilters(Collection<FlowFilter> filters);
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowBased.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,4 @@
+package com.passus.st.client;
+
+public abstract class FlowBased extends AbstractFlow {
+}
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Wed May 29 12:32:29 2019 +0200
@@ -3,7 +3,6 @@
 import com.passus.commons.Assert;
 import com.passus.data.ByteBuff;
 import com.passus.data.DataDecoder;
-import com.passus.st.client.http.HttpSessionPayloadEvent;
 import com.passus.st.emitter.ChannelContext;
 import com.passus.st.emitter.SessionInfo;
 
@@ -42,6 +41,9 @@
 
     private int loop;
 
+    private ClientX client;
+
+    @Deprecated
     protected DataDecoder decoder;
 
     private Map<String, Object> params;
@@ -58,6 +60,14 @@
         this.channelContext = channelContext;
     }
 
+    public ClientX client() {
+        return client;
+    }
+
+    public void client(ClientX client) {
+        this.client = client;
+    }
+
     public void state(int state) {
         this.state = state;
     }
@@ -98,6 +108,7 @@
         this.sentEvent = sentEvent;
     }
 
+    @Deprecated
     public DataDecoder decoder() {
         return decoder;
     }
@@ -126,7 +137,7 @@
         return timeout != -1 && System.currentTimeMillis() > timeout;
     }
 
-    public void setSentEvent(HttpSessionPayloadEvent sentEvent) {
+    public void setSentEvent(SessionPayloadEvent sentEvent) {
         this.sentEvent = sentEvent;
     }
 
@@ -180,6 +191,7 @@
     public void clear() {
         buffer = null;
         sentEvent = null;
+        timeout = -1;
 
         if (params != null) {
             params.clear();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,73 @@
+package com.passus.st.client;
+
+import com.passus.st.emitter.SessionInfo;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.passus.st.client.FlowContext.STATE_DISCONNECTED;
+
+public class FlowUtils {
+
+    public static final Map<Integer, Long> DEFAULT_TIMEOUTS;
+
+    static {
+        Map<Integer, Long> defaultTimeouts = new HashMap<>();
+        defaultTimeouts.put(FlowContext.STATE_CONNECTING, 10_000L);
+        defaultTimeouts.put(FlowContext.STATE_CONNECTED, 60_000L);
+        defaultTimeouts.put(FlowContext.STATE_REQ_SENT, 30_000L);
+        defaultTimeouts.put(FlowContext.STATE_RESP_RECEIVED, 60_000L);
+        defaultTimeouts.put(FlowContext.STATE_ERROR, 60_000L);
+        defaultTimeouts.put(FlowContext.STATE_DISCONNECTING, 2_000L);
+        defaultTimeouts.put(STATE_DISCONNECTED, 0L);
+        DEFAULT_TIMEOUTS = Collections.unmodifiableMap(defaultTimeouts);
+    }
+
+    private FlowUtils() {
+    }
+
+
+    public static void trace(Logger log, FlowContext flowContext, String message, Object... args) {
+        log(log, flowContext, Level.TRACE, message, args);
+    }
+
+    public static void debug(Logger log, FlowContext flowContext, String message, Throwable cause) {
+        log(log, flowContext, Level.DEBUG, message, cause);
+    }
+
+    public static void error(Logger log, FlowContext flowContext, String message, Throwable cause) {
+        log(log, flowContext, Level.ERROR, message, cause);
+    }
+
+    public static void log(Logger log, FlowContext flowContext, Level level, String message, Throwable cause) {
+        message = String.format("%s [%s]", message, flowContext.sessionInfo());
+        log.log(level, message, cause);
+    }
+
+    public static final void debug(Logger log, FlowContext flowContext, String message, Object... args) {
+        log(log, flowContext, Level.DEBUG, message, args);
+    }
+
+    public static void log(Logger log, FlowContext flowContext, Level level, String message, Object... args) {
+        if (args.length > 0) {
+            message = String.format(message, args);
+        }
+
+        SessionInfo session = flowContext.sessionInfo();
+        if (args.length == 0) {
+            log.log(level, message + " [{}]", session);
+        } else {
+            Object[] logArgs = new Object[args.length + 1];
+            for (int i = 0; i < args.length; i++) {
+                logArgs[i] = args[i];
+            }
+
+            logArgs[logArgs.length - 1] = session;
+            log.log(level, message + " [{}]", logArgs);
+        }
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,168 @@
+package com.passus.st.client;
+
+import com.passus.commons.Assert;
+import com.passus.commons.time.TimeAware;
+import com.passus.commons.time.TimeGenerator;
+import com.passus.st.client.http.HttpClientWorkerMetric;
+import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.metric.MetricSource;
+import com.passus.st.metric.MetricsContainer;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public abstract class FlowWorker extends Thread implements EmitterHandler, MetricSource, TimeAware {
+
+    protected final Logger logger = LogManager.getLogger(getClass());
+
+    protected final int index;
+
+    private ClientListener listener;
+
+    protected FlowFilterChain filterChain = new FlowFilterChain();
+
+    protected final Emitter emitter;
+
+    protected boolean collectMetric;
+
+    protected HttpClientWorkerMetric metric;
+
+    protected boolean connectPartialSession = false;
+
+    protected TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
+
+    protected ClientXFactory clientFactory;
+
+    public FlowWorker(Emitter emitter, String name, int index) {
+        super(name + index);
+        Assert.notNull(emitter, "emitter");
+        this.emitter = emitter;
+        this.index = index;
+    }
+
+    public boolean isConnectPartialSession() {
+        return connectPartialSession;
+    }
+
+    public void setConnectPartialSession(boolean connectPartialSession) {
+        this.connectPartialSession = connectPartialSession;
+    }
+
+    public abstract boolean isWorking();
+
+    public int index() {
+        return index;
+    }
+
+    public FlowFilterChain filterChain() {
+        return filterChain;
+    }
+
+    public void setFilterChain(FlowFilterChain filterChain) {
+        Assert.notNull(filterChain, "filterChain");
+        this.filterChain = filterChain;
+    }
+
+    protected final void fireResponseReceived(Object request, Object response, FlowContext context) {
+        if (listener != null) {
+            listener.responseReceived(request, response, context);
+        }
+    }
+
+    public ClientListener getListener() {
+        return listener;
+    }
+
+    public void setListener(ClientListener listener) {
+        this.listener = listener;
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return metric != null;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        if (collectMetrics && metric == null) {
+            metric = new HttpClientWorkerMetric();
+            metric.activate();
+            collectMetric = true;
+        } else if (!collectMetrics && metric != null) {
+            metric.deactivate();
+            collectMetric = false;
+            metric = null;
+        }
+    }
+
+    @Override
+    public TimeGenerator getTimeGenerator() {
+        return timeGenerator;
+    }
+
+    @Override
+    public void setTimeGenerator(TimeGenerator timeGenerator) {
+        Assert.notNull(timeGenerator, "timeGenerator");
+        this.timeGenerator = timeGenerator;
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+        if (collectMetric) {
+            synchronized (metric) {
+                container.update(System.currentTimeMillis(), metric);
+                metric.reset();
+            }
+        }
+    }
+
+    public abstract int activeConnections();
+
+    public abstract void close();
+
+    public abstract void close(SessionInfo session);
+
+    public abstract void handle(Event event);
+
+    protected final void trace(FlowContext flowContext, String message, Object... args) {
+        log(flowContext, Level.TRACE, message, args);
+    }
+
+    protected final void debug(FlowContext flowContext, String message, Throwable cause) {
+        log(flowContext, Level.DEBUG, message, cause);
+    }
+
+    protected final void error(FlowContext flowContext, String message, Throwable cause) {
+        log(flowContext, Level.ERROR, message, cause);
+    }
+
+    protected final void log(FlowContext flowContext, Level level, String message, Throwable cause) {
+        message = String.format("%s [%s]", message, flowContext.sessionInfo());
+        logger.log(level, message, cause);
+    }
+
+    protected final void debug(FlowContext flowContext, String message, Object... args) {
+        log(flowContext, Level.DEBUG, message, args);
+    }
+
+    protected final void log(FlowContext flowContext, Level level, String message, Object... args) {
+        if (args.length > 0) {
+            message = String.format(message, args);
+        }
+
+        SessionInfo session = flowContext.sessionInfo();
+        if (args.length == 0) {
+            logger.log(level, message + " [{}]", session);
+        } else {
+            Object[] logArgs = new Object[args.length + 1];
+            for (int i = 0; i < args.length; i++) {
+                logArgs[i] = args[i];
+            }
+
+            logArgs[logArgs.length - 1] = session;
+            logger.log(level, message + " [{}]", logArgs);
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,563 @@
+package com.passus.st.client;
+
+import com.passus.commons.Assert;
+import com.passus.data.ByteBuff;
+import com.passus.data.DataDecoder;
+import com.passus.data.HeapByteBuff;
+import com.passus.filter.Filter;
+import com.passus.st.emitter.ChannelContext;
+import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.metric.MetricsContainer;
+import it.unimi.dsi.fastutil.ints.Int2LongArrayMap;
+import it.unimi.dsi.fastutil.ints.Int2LongMap;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.passus.st.client.FlowContext.*;
+import static com.passus.st.client.FlowUtils.DEFAULT_TIMEOUTS;
+
+public abstract class FlowWorkerBased extends FlowWorker {
+
+    public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f;
+
+    protected final Map<SessionInfo, FlowContext> sessions = new ConcurrentHashMap<>();
+
+    private final Set<SessionInfo> blockedSessions = new HashSet<>();
+
+    private final Int2LongMap timeouts = new Int2LongArrayMap();
+
+    protected final Object lock = new Object();
+
+    protected volatile boolean working = false;
+
+    private long checkTimeoutsPeriod = 5_000;
+
+    private long nextCheckTimeoutsTime = -1;
+
+    private float sleepFactor = SLEEP_FACTOR_NO_SLEEP;
+
+    private long lastEventTimestamp = -1;
+
+    public FlowWorkerBased(Emitter emitter, String name, int index) {
+        super(emitter, name, index);
+        timeouts.putAll(DEFAULT_TIMEOUTS);
+    }
+
+    @Override
+    public boolean isWorking() {
+        return working;
+    }
+
+    @Override
+    public int activeConnections() {
+        int count = 0;
+        synchronized (lock) {
+            for (FlowContext flowContext : sessions.values()) {
+                if (flowContext.state() != STATE_DISCONNECTED) {
+                    count++;
+                }
+            }
+        }
+
+        return count;
+    }
+
+    protected final void addBlockedSession(SessionInfo session) {
+        blockedSessions.add(session);
+    }
+
+    protected final boolean isBlockedSession(SessionInfo session) {
+        return !blockedSessions.isEmpty() && blockedSessions.contains(session);
+    }
+
+    public float getSleepFactor() {
+        return sleepFactor;
+    }
+
+    public void setSleepFactor(float sleepFactor) {
+        Assert.greaterOrEqualZero(sleepFactor, "sleepFactor");
+        this.sleepFactor = sleepFactor;
+    }
+
+    public long getCheckTimeoutsPeriod() {
+        return checkTimeoutsPeriod;
+    }
+
+    public void setCheckTimeoutsPeriod(long checkTimeoutsPeriod) {
+        Assert.greaterThanZero(checkTimeoutsPeriod, "checkTimeoutsPeriod");
+        this.checkTimeoutsPeriod = checkTimeoutsPeriod;
+    }
+
+    protected final void changeFlowState(FlowContext flowContext, int state) {
+        try {
+            if (flowContext.state() == state) {
+                return;
+            }
+
+            int oldState = flowContext.state();
+            if (logger.isDebugEnabled()) {
+                debug(flowContext, "Flow status changing {} -> {}.",
+                        contextStateToString(flowContext.state()),
+                        contextStateToString(state)
+                );
+            }
+
+            switch (state) {
+                case FlowContext.STATE_CONNECTING:
+                    flowContext.clear();
+                    break;
+                case FlowContext.STATE_CONNECTED:
+                    flowContext.client().init(flowContext);
+                    flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY));
+                    break;
+                case FlowContext.STATE_ERROR:
+                    changeFlowState(flowContext, STATE_DISCONNECTED);
+                    break;
+                case FlowContext.STATE_RESP_RECEIVED:
+                    flowContext.sentEvent(null);
+                    flowContext.receivedStartTimestamp(-1);
+                    break;
+                case FlowContext.STATE_DISCONNECTING:
+                    if (flowContext.state() < FlowContext.STATE_DISCONNECTING) {
+                        if (flowContext.channelContext() != null) {
+                            try {
+                                flowContext.channelContext().close();
+                            } catch (Exception e) {
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug(e.getMessage(), e);
+                                }
+                            }
+                        } else {
+                            changeFlowState(flowContext, STATE_DISCONNECTED);
+                        }
+                    } else {
+                        return;
+                    }
+                    break;
+                case STATE_DISCONNECTED:
+                    flowContext.state(STATE_DISCONNECTED);
+                    flowContext.clear();
+                    removeFlowContext(flowContext);
+                    flowStateChanged(flowContext, oldState);
+                    return;
+            }
+
+            long timeout = timeouts.get(flowContext.state());
+            flowContext.timeout(timeGenerator.currentTimeMillis() + timeout);
+            flowContext.state(state);
+            flowStateChanged(flowContext, oldState);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+    }
+
+    protected void flowStateChanged(FlowContext context, int oldState) {
+
+    }
+
+    protected FlowContext flowContext(SessionEvent event) {
+        return flowContext(event.getSessionInfo());
+    }
+
+    protected FlowContext flowContext(ChannelContext context) {
+        return flowContext(context.getSessionInfo());
+    }
+
+    protected FlowContext flowContext(SessionInfo session) {
+        FlowContext context = sessions.get(session);
+        if (context == null) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Context for session '" + session + "' not found.");
+            }
+        }
+
+        return context;
+    }
+
+    protected FlowContext createFlowContext(SessionInfo session) {
+        return new FlowContext(session);
+    }
+
+    protected FlowContext register(SessionEvent sessionEvent) {
+        return register(sessionEvent.getSessionInfo());
+    }
+
+    protected FlowContext register(SessionInfo session) {
+        synchronized (lock) {
+            if (sessions.containsKey(session)) {
+                logger.warn("Unable to register session '" + session + "'. Session already registered.");
+                return null;
+            }
+
+            FlowContext flowContext = createFlowContext(session);
+            sessions.put(session, flowContext);
+            return flowContext;
+        }
+    }
+
+    protected FlowContext connect(SessionEvent sessionEvent) {
+        return connect(sessionEvent.getSessionInfo());
+    }
+
+    protected FlowContext connect(SessionInfo session) {
+        synchronized (lock) {
+            try {
+                FlowContext flowContext = register(session);
+                if (flowContext != null) {
+                    emitter.connect(session, this, index);
+                    return flowContext;
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+            return null;
+        }
+    }
+
+    @Override
+    public void close() {
+        synchronized (lock) {
+            for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) {
+                FlowContext flowContext = entry.getValue();
+                try {
+                    closeSession(flowContext);
+                } catch (Exception e) {
+                    if (logger.isDebugEnabled()) {
+                        debug(flowContext, e.getMessage(), e);
+                    }
+                }
+            }
+
+            sessions.clear();
+            working = false;
+        }
+    }
+
+    protected void close(SessionEvent sessionEvent) {
+        close(sessionEvent.getSessionInfo());
+    }
+
+    protected void close(FlowContext flowContext) {
+        synchronized (lock) {
+            try {
+                closeSession(flowContext);
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void close(SessionInfo session) {
+        synchronized (lock) {
+            try {
+                FlowContext flowContext = flowContext(session);
+                closeSession(flowContext);
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    protected void closeSession(FlowContext flowContext) {
+        synchronized (lock) {
+            if (flowContext != null) {
+                changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING);
+            }
+        }
+    }
+
+    protected void removeFlowContext(FlowContext flowContext) {
+        synchronized (lock) {
+            debug(flowContext, "removeFlowContext");
+            sessions.remove(flowContext.sessionInfo());
+        }
+    }
+
+    protected void reconnect(FlowContext flowContext) {
+        synchronized (lock) {
+            try {
+                if (logger.isDebugEnabled()) {
+                    debug(flowContext, "Reconnect (state: {}).", contextStateToString(flowContext.state()));
+                }
+
+                SessionInfo session = flowContext.sessionInfo();
+                changeFlowState(flowContext, FlowContext.STATE_CONNECTING);
+                emitter.connect(session, this, index);
+            } catch (Exception e) {
+                error(flowContext, e.getMessage(), e);
+            }
+        }
+    }
+
+    protected void closeAllConnections() {
+        synchronized (lock) {
+            for (FlowContext flowContext : sessions.values()) {
+                closeSession(flowContext);
+            }
+        }
+    }
+
+    private void sleepSilently(long millis) {
+        if (millis == 0) {
+            return;
+        }
+        if (millis < 0) {
+            logger.warn("Cannot sleep for negative interval: {}.");
+            return;
+        }
+        logger.debug("Going sleep for: {}.", millis);
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException ignore) {
+        }
+    }
+
+    protected void sleep(Event event) {
+        if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) {
+            if (lastEventTimestamp != -1) {
+                long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor);
+                sleepSilently(timeToSleep);
+            }
+            lastEventTimestamp = event.getTimestamp();
+        }
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+        synchronized (lock) {
+            super.writeMetrics(container);
+        }
+    }
+
+    @Override
+    public void channelActive(ChannelContext context) throws Exception {
+        synchronized (lock) {
+            FlowContext flowContext = flowContext(context);
+            if (flowContext != null) {
+                if (logger.isDebugEnabled()) {
+                    debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})",
+                            context.getLocalAddress(),
+                            context.getRemoteAddress());
+                }
+
+                flowContext.channelContext(context);
+                changeFlowState(flowContext, STATE_CONNECTED);
+            }
+
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    public void channelInactive(ChannelContext context) throws Exception {
+        synchronized (lock) {
+            FlowContext flowContext = flowContext(context);
+            if (flowContext != null) {
+                if (logger.isDebugEnabled()) {
+                    debug(flowContext, "Channel inactive.");
+                }
+
+                changeFlowState(flowContext, STATE_DISCONNECTED);
+            }
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    public void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
+        synchronized (lock) {
+            FlowContext flowContext = flowContext(context);
+            try {
+                if (flowContext != null) {
+                    ClientX client = flowContext.client();
+                    ClientXDataDecoder decoder = client.getResponseDecoder(flowContext);
+                    decoder.decode(data, flowContext);
+                    long now = timeGenerator.currentTimeMillis();
+                    if (flowContext.receivedStartTimestamp() == -1) {
+                        flowContext.receivedStartTimestamp(now);
+                    }
+
+                    if (decoder.state() == DataDecoder.STATE_ERROR) {
+                        if (logger.isDebugEnabled()) {
+                            debug(flowContext, "Decoder error. " + decoder.getLastError());
+                        }
+
+                        decoder.clear(flowContext);
+                        changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED);
+                    } else if (decoder.state() == DataDecoder.STATE_FINISHED) {
+                        Object resp = decoder.getResult();
+                        Object req = null;
+                        if (flowContext.sentEvent() != null) {
+                            req = flowContext.sentEvent().getRequest();
+                        }
+
+                        if (filterChain.filterInbound(req, resp, flowContext) != Filter.DENY) {
+                            try {
+                                fireResponseReceived(req, resp, flowContext);
+                            } catch (Exception e) {
+                                error(flowContext, e.getMessage(), e);
+                            }
+                        }
+
+                        decoder.clear(flowContext);
+                        changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED);
+                    }
+                }
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    debug(flowContext, e.getMessage(), e);
+                }
+            }
+
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    public void dataWriteStart(ChannelContext context) {
+        synchronized (lock) {
+            FlowContext flowContext = flowContext(context);
+            if (flowContext != null && flowContext.sentEvent() != null) {
+                long now = timeGenerator.currentTimeMillis();
+                flowContext.sendStartTimestamp(now);
+                flowContext.client().onDataWriteStart(flowContext);
+            }
+        }
+    }
+
+    @Override
+    public void dataWritten(ChannelContext context) throws Exception {
+        synchronized (lock) {
+            FlowContext flowContext = flowContext(context);
+            if (flowContext != null && flowContext.sentEvent() != null) {
+                long now = timeGenerator.currentTimeMillis();
+                if (collectMetric) {
+                    synchronized (metric) {
+                        metric.addRequestSendingTime(now - flowContext.sendStartTimestamp());
+                    }
+                }
+
+                flowContext.client().onDataWriteEnd(flowContext);
+            }
+
+            lock.notifyAll();
+
+        }
+    }
+
+    @Override
+    public void errorOccurred(ChannelContext context, Throwable cause) throws Exception {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Error occured. " + cause.getMessage(), cause);
+        }
+
+        synchronized (lock) {
+            FlowContext flowContext = flowContext(context);
+            if (flowContext != null) {
+                changeFlowState(flowContext, FlowContext.STATE_ERROR);
+            }
+
+            lock.notifyAll();
+        }
+    }
+
+    protected boolean send(FlowContext flowContext, SessionPayloadEvent event) {
+        synchronized (lock) {
+            Object req = event.getRequest();
+            if (req != null) {
+                if (filterChain.filterOutbound(req, event.getResponse(), flowContext) == Filter.DENY) {
+                    return false;
+                }
+
+                ClientX client = flowContext.client();
+                ClientXDataEncoder encoder = client.getRequestEncoder(flowContext);
+                ByteBuff buffer = flowContext.buffer();
+                encoder.encode(req, flowContext, buffer);
+
+                if (collectMetric) {
+                    synchronized (metric) {
+                        metric.incRequestsNum();
+                        metric.addRequestSize(flowContext.buffer().readableBytes());
+                    }
+                }
+
+                try {
+                    changeFlowState(flowContext, FlowContext.STATE_REQ_SENT);
+                    flowContext.sentEvent(event);
+                    flowContext.channelContext().writeAndFlush(buffer);
+                    buffer.clear();
+                    return true;
+                } catch (Exception e) {
+                    if (logger.isDebugEnabled()) {
+                        debug(flowContext, e.getMessage(), e);
+                    }
+                }
+            }
+        }
+
+        return false;
+    }
+
+    protected void processTimeouts() {
+        synchronized (lock) {
+            try {
+                long now = timeGenerator.currentTimeMillis();
+                if (nextCheckTimeoutsTime == -1) {
+                    nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
+                } else if (nextCheckTimeoutsTime > now) {
+                    nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
+                    for (FlowContext flowContext : sessions.values()) {
+                        if (flowContext.timeouted()) {
+                            if (logger.isDebugEnabled()) {
+                                debug(flowContext, "Flow for session '{}' timed out (state '{}').",
+                                        flowContext.sessionInfo(),
+                                        contextStateToString(flowContext.state()));
+                            }
+
+                            switch (flowContext.state()) {
+                                case FlowContext.STATE_CONNECTING:
+                                case FlowContext.STATE_CONNECTED:
+                                case FlowContext.STATE_REQ_SENT:
+                                case FlowContext.STATE_ERROR:
+                                    closeSession(flowContext);
+                                    break;
+                                case FlowContext.STATE_RESP_RECEIVED:
+                                    //Dziwny blad nie powinien wystepowac
+                                    break;
+                                case FlowContext.STATE_DISCONNECTING:
+                                case STATE_DISCONNECTED:
+                                    removeFlowContext(flowContext);
+                                    break;
+                            }
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    protected Event eventInstanceForWorker(Event event) {
+        if (event instanceof SessionEvent) {
+            Event newEvent = ((SessionEvent) event).instanceForWorker(index);
+            newEvent.setTimestamp(event.getTimestamp());
+            return newEvent;
+        } else {
+            return event;
+        }
+    }
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/NullFlowWorker.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,36 @@
+package com.passus.st.client;
+
+import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.SessionInfo;
+
+public class NullFlowWorker extends FlowWorker {
+
+    public NullFlowWorker(Emitter emitter, String name, int index) {
+        super(emitter, name, index);
+    }
+
+    @Override
+    public boolean isWorking() {
+        return false;
+    }
+
+    @Override
+    public int activeConnections() {
+        return 0;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void close(SessionInfo session) {
+
+    }
+
+    @Override
+    public void handle(Event event) {
+
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,304 @@
+package com.passus.st.client;
+
+import com.passus.commons.Assert;
+import com.passus.st.emitter.ChannelContext;
+import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.SessionInfo;
+
+import java.util.*;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.Semaphore;
+
+public class ParallelFlowWorker extends FlowWorkerBased {
+
+    public static final int DEFAULT_MAX_SENT_REQUESTS = 10;
+
+    private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>();
+
+    private int maxSentRequests = DEFAULT_MAX_SENT_REQUESTS;
+
+    private long eventsQueueWaitTime = 100;
+
+    private final Semaphore semaphore = new Semaphore(maxSentRequests);
+
+    private final Deque<LocalFlowContext> flowIndex = new ArrayDeque<>();
+
+    private boolean closeAllConnections = false;
+
+    public ParallelFlowWorker(Emitter emitter, String name, int index) {
+        super(emitter, name, index);
+    }
+
+    public int getMaxSentRequests() {
+        return maxSentRequests;
+    }
+
+    public void setMaxSentRequests(int maxSentRequests) {
+        Assert.greaterThanZero(maxSentRequests, "maxSentRequests");
+        this.maxSentRequests = maxSentRequests;
+    }
+
+    @Override
+    protected LocalFlowContext flowContext(SessionInfo session) {
+        return (LocalFlowContext) super.flowContext(session);
+    }
+
+    @Override
+    protected LocalFlowContext flowContext(ChannelContext context) {
+        return flowContext(context.getSessionInfo());
+    }
+
+    @Override
+    protected LocalFlowContext flowContext(SessionEvent event) {
+        return flowContext(event.getSessionInfo());
+    }
+
+    public long getEventsQueueWaitTime() {
+        return eventsQueueWaitTime;
+    }
+
+    public void setEventsQueueWaitTime(long eventsQueueWaitTime) {
+        Assert.greaterThanZero(eventsQueueWaitTime, "eventsQueueWaitTime");
+        this.eventsQueueWaitTime = eventsQueueWaitTime;
+    }
+
+    @Override
+    protected void removeFlowContext(FlowContext flowContext) {
+        if (flowContext != null) {
+            flowIndex.remove(flowContext);
+        }
+    }
+
+    @Override
+    protected LocalFlowContext createFlowContext(SessionInfo session) {
+        LocalFlowContext flowContext = new LocalFlowContext(session);
+        flowIndex.add(flowContext);
+        return flowContext;
+    }
+
+    private void waitCloseAllConnections() {
+        closeAllConnections = true;
+        synchronized (lock) {
+            while (!flowIndex.isEmpty()) {
+                try {
+                    lock.wait(10);
+                } catch (InterruptedException ignore) {
+                }
+            }
+        }
+
+        closeAllConnections = false;
+    }
+
+    @Override
+    protected boolean send(FlowContext flowContext, SessionPayloadEvent event) {
+        //Sprawdzamy, czy polaczen nie jest za duzo. Jezeli jest, to zamykamy
+        //najmniej uzywane.
+        if (flowIndex.size() > maxSentRequests) {
+            int diff = flowIndex.size() - maxSentRequests;
+            if (logger.isDebugEnabled()) {
+                debug(flowContext, "Too many connections {}.", flowIndex.size());
+            }
+
+            Iterator<LocalFlowContext> it = flowIndex.descendingIterator();
+            while (it.hasNext()) {
+                LocalFlowContext indexFlowContext = it.next();
+                if (indexFlowContext.eventsQueue.isEmpty()
+                        && indexFlowContext.state() != FlowContext.STATE_REQ_SENT) {
+                    close(flowContext);
+                    if (--diff == 0) {
+                        break;
+                    }
+                }
+            }
+        }
+
+        return super.send(flowContext, event);
+    }
+
+    private boolean canSend(FlowContext flowContext) {
+        int state = flowContext.state();
+        return (state == FlowContext.STATE_CONNECTED
+                || state == FlowContext.STATE_RESP_RECEIVED
+                || state == FlowContext.STATE_ERROR
+                || state == FlowContext.STATE_REQ_SENT);
+    }
+
+    @Override
+    protected void flowStateChanged(FlowContext flowContext, int oldState) {
+        LocalFlowContext localFlowContext = (LocalFlowContext) flowContext;
+        if (oldState == FlowContext.STATE_REQ_SENT) {
+            if (semaphore.availablePermits() <= maxSentRequests) {
+                semaphore.release();
+            }
+        }
+
+        if (closeAllConnections) {
+            if (localFlowContext.state() < FlowContext.STATE_DISCONNECTING
+                    && localFlowContext.state() != FlowContext.STATE_REQ_SENT
+                    && localFlowContext.eventsQueue.isEmpty()) {
+                close(flowContext);
+                return;
+            }
+        }
+
+        if (localFlowContext.state() >= FlowContext.STATE_CONNECTED
+                && localFlowContext.state() < FlowContext.STATE_DISCONNECTING
+                && localFlowContext.state() != FlowContext.STATE_REQ_SENT
+                && !localFlowContext.eventsQueue.isEmpty()) {
+
+            Event event = localFlowContext.eventsQueue.peek();
+            if (event.getType() == SessionStatusEvent.TYPE) {
+                SessionStatusEvent statusEvent = (SessionStatusEvent) event;
+                if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
+                    localFlowContext.eventsQueue.poll();
+                    close((SessionStatusEvent) event);
+                }
+            } else if (event.getType() == SessionPayloadEvent.TYPE
+                    && canSend(flowContext)) {
+                localFlowContext.eventsQueue.poll();
+                send(flowContext, (SessionPayloadEvent) event);
+            } else {
+                localFlowContext.eventsQueue.poll();
+            }
+        }
+    }
+
+    private void makeFirst(LocalFlowContext flowContext) {
+        synchronized (lock) {
+            flowIndex.remove(flowContext);
+            flowIndex.addFirst(flowContext);
+        }
+    }
+
+    private void addToQueue(LocalFlowContext flowContext, Event event) {
+        flowContext.eventsQueue.add(event);
+        makeFirst(flowContext);
+    }
+
+    @Override
+    public void handle(Event event) {
+        Event newEvent = null;
+        switch (event.getType()) {
+            case SessionPayloadEvent.TYPE:
+                semaphore.acquireUninterruptibly();
+                newEvent = eventInstanceForWorker(event);
+                break;
+            case SessionStatusEvent.TYPE:
+            case DataEvents.DataLoopEnd.TYPE:
+            case DataEvents.DataEnd.TYPE:
+                newEvent = event;
+        }
+
+        if (newEvent != null) {
+            synchronized (lock) {
+                try {
+                    eventsQueue.put(newEvent);
+                    lock.notifyAll();
+                } catch (Exception e) {
+                    logger.debug("Unable to add event to queue. " + e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    private void processEvent(Event event) {
+        if (event instanceof SessionEvent) {
+            switch (event.getType()) {
+                case SessionStatusEvent.TYPE:
+                    SessionStatusEvent statusEvent = (SessionStatusEvent) event;
+                    if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
+                        LocalFlowContext flowContext = flowContext((SessionEvent) event);
+                        if (flowContext != null) {
+                            if (flowContext.eventsQueue.isEmpty()
+                                    && flowContext.state() != FlowContext.STATE_REQ_SENT) {
+                                close(statusEvent);
+                            } else {
+                                addToQueue(flowContext, event);
+                            }
+                        }
+                    }
+                    break;
+                case SessionPayloadEvent.TYPE: {
+                    SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event;
+                    LocalFlowContext flowContext = flowContext(payloadEvent);
+                    if (flowContext != null) {
+                        if (flowContext.state() >= FlowContext.STATE_CONNECTING
+                                && flowContext.state() < FlowContext.STATE_DISCONNECTING) {
+                            if (flowContext.eventsQueue.isEmpty()
+                                    && (flowContext.state() == FlowContext.STATE_CONNECTED
+                                    || flowContext.state() == FlowContext.STATE_ERROR
+                                    || flowContext.state() == FlowContext.STATE_RESP_RECEIVED)) {
+                                send(flowContext, payloadEvent);
+                            } else {
+                                addToQueue(flowContext, event);
+                            }
+                        }
+                    } else {
+                        try {
+                            SessionInfo session = payloadEvent.getSessionInfo();
+                            flowContext = (LocalFlowContext) register(session);
+                            addToQueue(flowContext, event);
+                            emitter.connect(session, this, index);
+                        } catch (Exception e) {
+                            logger.error(e.getMessage(), e);
+                        }
+                    }
+
+                    break;
+                }
+            }
+
+        } else if (event.getType() == DataEvents.DataLoopEnd.TYPE) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("DataLoopEnd received.");
+            }
+
+            waitCloseAllConnections();
+            filterChain.reset();
+        } else if (event.getType() == DataEvents.DataEnd.TYPE) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("DataEnd received. Deactivation.");
+            }
+
+            working = false;
+        }
+
+    }
+
+    @Override
+    public void run() {
+        synchronized (lock) {
+            working = true;
+            while (working) {
+                try {
+                    try {
+                        lock.wait(eventsQueueWaitTime);
+                    } catch (InterruptedException ignore) {
+                    }
+
+                    Event event;
+                    while ((event = eventsQueue.poll()) != null) {
+                        processEvent(event);
+                    }
+                } catch (Exception e) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug(e.getMessage(), e);
+                    }
+                }
+            }
+        }
+    }
+
+    protected static class LocalFlowContext extends FlowContext {
+
+        private final Queue<Event> eventsQueue;
+
+        private LocalFlowContext(SessionInfo session) {
+            super(session);
+            eventsQueue = new LinkedList<>();
+        }
+
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/ReqRespPair.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,20 @@
+package com.passus.st.client;
+
+public class ReqRespPair<R, S> {
+
+    final R request;
+    final S response;
+
+    public ReqRespPair(R request, S response) {
+        this.request = request;
+        this.response = response;
+    }
+
+    public R getRequest() {
+        return request;
+    }
+
+    public S getResponse() {
+        return response;
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java	Wed May 29 12:32:29 2019 +0200
@@ -3,20 +3,44 @@
 import com.passus.st.emitter.SessionInfo;
 
 /**
- *
  * @author Mirosław Hawrot
  */
-public abstract class SessionPayloadEvent<T> extends SessionEvent {
+public class SessionPayloadEvent<R, S> extends SessionEvent {
 
-    private final T payload;
+    public static final int TYPE = 12;
 
-    public SessionPayloadEvent(SessionInfo sessionInfo, T payload, String sourceName) {
+    private final R request;
+
+    private final S response;
+
+    private final int protocolId;
+
+    public SessionPayloadEvent(SessionInfo sessionInfo, R request, S response, int protocolId, String sourceName) {
         super(sessionInfo, sourceName);
-        this.payload = payload;
+        this.request = request;
+        this.response = response;
+        this.protocolId = protocolId;
     }
 
-    public T getPayload() {
-        return payload;
+    @Override
+    public int getType() {
+        return TYPE;
     }
 
+    public int getProtocolId() {
+        return protocolId;
+    }
+
+    public R getRequest() {
+        return request;
+    }
+
+    public S getResponse() {
+        return response;
+    }
+
+    @Override
+    public SessionPayloadEvent instanceForWorker(int index) {
+        return new SessionPayloadEvent(getSessionInfo(), request, response, protocolId, getSourceName());
+    }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,288 @@
+package com.passus.st.client;
+
+import com.passus.commons.Assert;
+import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.SessionInfo;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import static com.passus.st.client.FlowContext.contextStateToString;
+
+public class SynchFlowWorker extends FlowWorkerBased {
+
+    public static final String TYPE = "synch";
+
+    private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>();
+
+    private long eventsQueueWaitTime = 100;
+
+    /**
+     * Context dla ktorego wykonywana jest operacja.
+     */
+    private FlowContext currFlowContext;
+
+    private boolean loopEnd = false;
+
+    private int loop = 0;
+
+    public SynchFlowWorker(Emitter emitter, String name, int index) {
+        super(emitter, name, index);
+    }
+
+    @Override
+    public boolean isWorking() {
+        return working;
+    }
+
+    public long getEventsQueueWaitTime() {
+        return eventsQueueWaitTime;
+    }
+
+    public void setEventsQueueWaitTime(long eventsQueueWaitTime) {
+        Assert.greaterThanZero(eventsQueueWaitTime, "eventsQueueWaitTime");
+        this.eventsQueueWaitTime = eventsQueueWaitTime;
+    }
+
+    @Override
+    public void sessionInvalidated(SessionInfo session) throws Exception {
+        synchronized (lock) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Session {} invalidated.", session);
+            }
+
+            FlowContext flowContext = flowContext(session);
+            if (flowContext != null) {
+                changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING);
+            }
+
+            addBlockedSession(session);
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    protected void flowStateChanged(FlowContext context, int oldState) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("flowStateChanged {},{}", context == currFlowContext, contextStateToString(context.state()));
+        }
+
+        if (context == currFlowContext) {
+            if (context.state() == FlowContext.STATE_CONNECTED
+                    || context.state() == FlowContext.STATE_RESP_RECEIVED
+                    || context.state() == FlowContext.STATE_ERROR
+                    || context.state() == FlowContext.STATE_DISCONNECTED) {
+                currFlowContext = null;
+            }
+        }
+    }
+
+    @Override
+    public void handle(Event event) {
+        Event newEvent = eventInstanceForWorker(event);
+        synchronized (lock) {
+            try {
+                eventsQueue.put(newEvent);
+            } catch (Exception e) {
+                logger.debug("Unable to add event to queue. " + e.getMessage(), e);
+            }
+
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    protected void closeAllConnections() {
+        synchronized (lock) {
+            boolean wait;
+            do {
+                wait = false;
+                for (FlowContext flowContext : sessions.values()) {
+                    if (flowContext.state() == FlowContext.STATE_REQ_SENT) {
+                        wait = true;
+                        break;
+                    }
+                }
+
+                if (wait) {
+                    try {
+                        lock.wait(100);
+                    } catch (Exception e) {
+                    }
+                }
+            } while (wait);
+
+            super.closeAllConnections();
+            while (!sessions.isEmpty()) {
+                try {
+                    lock.wait(100);
+                } catch (Exception e) {
+                }
+            }
+        }
+    }
+
+    /**
+     * Returns true if next event should be processed immediately.
+     *
+     * @return boolean
+     */
+    private boolean pollNext() {
+        if (currFlowContext != null) {
+            return false;
+        }
+
+        Event event = eventsQueue.poll();
+        if (event != null) {
+            sleep(event);
+            if (logger.isTraceEnabled()) {
+                logger.trace("Event processing: {}", event);
+            }
+
+            if (event instanceof SessionEvent) {
+                SessionEvent sessEvent = (SessionEvent) event;
+                if (isBlockedSession(sessEvent.getSessionInfo())) {
+                    return true;
+                }
+
+                if (event.getType() == SessionStatusEvent.TYPE) {
+                    SessionStatusEvent statusEvent = (SessionStatusEvent) sessEvent;
+                    if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
+                        try {
+                            currFlowContext = register(statusEvent);
+                            if (currFlowContext != null) {
+                                currFlowContext.loop(loop);
+                                emitter.connect(statusEvent.getSessionInfo(), this, index);
+                            }
+                        } catch (Exception e) {
+                            logger.error(e.getMessage(), e);
+                        }
+
+                        return (currFlowContext == null);
+                    } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
+                        currFlowContext = flowContext((SessionEvent) event);
+                        if (currFlowContext != null) {
+                            if (currFlowContext.state() != FlowContext.STATE_REQ_SENT) {
+                                close(statusEvent);
+                            }
+                        }
+                    }
+
+                    return true;
+                } else if (event.getType() == SessionPayloadEvent.TYPE) {
+                    FlowContext flowContext = flowContext(sessEvent);
+                    if (flowContext != null) {
+                        switch (flowContext.state()) {
+                            case FlowContext.STATE_CONNECTED:
+                            case FlowContext.STATE_RESP_RECEIVED:
+                            case FlowContext.STATE_ERROR:
+                                currFlowContext = flowContext;
+                                if (send(flowContext, (SessionPayloadEvent) event)) {
+                                    return false;
+                                } else {
+                                    currFlowContext = null;
+                                    return true;
+                                }
+                            case FlowContext.STATE_DISCONNECTING:
+                            case FlowContext.STATE_DISCONNECTED:
+                                if (connectPartialSession) {
+                                    currFlowContext = register(sessEvent);
+                                    if (currFlowContext != null) {
+                                        try {
+                                            currFlowContext.loop(loop);
+                                            emitter.connect(sessEvent.getSessionInfo(), this, index);
+                                        } catch (IOException e) {
+                                            logger.error(e.getMessage(), e);
+                                            currFlowContext = null;
+                                        }
+                                    }
+
+                                    return false;
+                                } else {
+                                    return true;
+                                }
+                            default:
+                                return false;
+                        }
+                    } else if (connectPartialSession) {
+                        currFlowContext = register(sessEvent);
+                        if (currFlowContext != null) {
+                            try {
+                                currFlowContext.loop(loop);
+                                emitter.connect(sessEvent.getSessionInfo(), this, index);
+                                eventsQueue.addFirst(sessEvent);
+                            } catch (IOException e) {
+                                logger.error(e.getMessage(), e);
+                                currFlowContext = null;
+                            }
+
+                            return false;
+                        } else {
+                            return true;
+                        }
+                    }
+
+                    return true;
+                } else {
+                    return true;
+                }
+            } else if (event.getType() == DataEvents.DataLoopEnd.TYPE) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("DataLoopEnd received.");
+                }
+
+                loopEnd = true;
+                closeAllConnections();
+                filterChain.reset();
+                loop = currFlowContext.loop() + 1;
+                loopEnd = false;
+                return true;
+            } else if (event.getType() == DataEvents.DataEnd.TYPE) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("DataEnd received. Deactivation.");
+                }
+
+                working = false;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public void close() {
+        synchronized (lock) {
+            eventsQueue.clear();
+            super.close();
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    public void run() {
+        synchronized (lock) {
+            working = true;
+            while (working) {
+                try {
+                    try {
+                        lock.wait(eventsQueueWaitTime);
+                    } catch (InterruptedException ignore) {
+                    }
+
+                    boolean nextPoll;
+                    do {
+                        if (loopEnd || !working) {
+                            break;
+                        }
+
+                        nextPoll = pollNext();
+                    } while (nextPoll);
+                } catch (Exception e) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug(e.getMessage(), e);
+                    }
+                }
+            }
+        }
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java	Wed May 29 12:32:29 2019 +0200
@@ -6,7 +6,10 @@
 import com.passus.net.http.HttpResponse;
 import com.passus.st.client.DataEvents.DataEnd;
 import com.passus.st.client.DataEvents.DataLoopEnd;
-import com.passus.st.client.*;
+import com.passus.st.client.Event;
+import com.passus.st.client.FlowContext;
+import com.passus.st.client.SessionEvent;
+import com.passus.st.client.SessionStatusEvent;
 import com.passus.st.emitter.Emitter;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.plugin.PluginConstants;
@@ -20,6 +23,7 @@
 /**
  * @author Mirosław Hawrot
  */
+@Deprecated
 @Plugin(name = HttpAsynchClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER)
 public class HttpAsynchClientWorker extends HttpFlowBasedClientWorker {
 
@@ -386,15 +390,22 @@
         }
     }
 
-    private static final class HttpResponseEvent extends SessionPayloadEvent<HttpResponse> {
+    private static final class HttpResponseEvent extends SessionEvent {
 
         public static final int TYPE = 1012;
 
+        private final HttpResponse payload;
+
         public HttpResponseEvent(SessionInfo sessionInfo, String sourceName, HttpResponse payload, long timestamp) {
-            super(sessionInfo, payload, sourceName);
+            super(sessionInfo, sourceName);
+            this.payload = payload;
             setTimestamp(timestamp);
         }
 
+        public HttpResponse getPayload() {
+            return payload;
+        }
+
         @Override
         public SessionEvent instanceForWorker(int index) {
             throw new UnsupportedOperationException("Not supported yet.");
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorker.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorker.java	Wed May 29 12:32:29 2019 +0200
@@ -20,10 +20,10 @@
 import java.util.List;
 
 /**
- *
  * @author Mirosław Hawrot
  */
-public abstract class HttpClientWorker extends Thread implements EmitterHandler, MetricSource, Runnable {
+@Deprecated
+public abstract class HttpClientWorker extends Thread implements EmitterHandler, MetricSource {
 
     public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f;
 
@@ -97,7 +97,7 @@
         Assert.notNull(listener, "listener");
         this.listeners.remove(listener);
     }
-    
+
     @Override
     public boolean isCollectMetrics() {
         return metric != null;
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerDispatcher.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerDispatcher.java	Wed May 29 12:32:29 2019 +0200
@@ -3,13 +3,12 @@
 import com.passus.st.client.Event;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public interface HttpClientWorkerDispatcher {
-    
-    public abstract HttpClientWorker find(Event event, HttpClientWorker[] workers);
 
-    public abstract int dispatch(Event event, HttpClientWorker[] workers);
-    
+    HttpClientWorker find(Event event, HttpClientWorker[] workers);
+
+    int dispatch(Event event, HttpClientWorker[] workers);
+
 }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerFactory.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerFactory.java	Wed May 29 12:32:29 2019 +0200
@@ -1,15 +1,12 @@
 package com.passus.st.client.http;
 
 import com.passus.commons.plugin.PluginFactory;
-import com.passus.st.client.http.filter.HttpFilter;
-import com.passus.st.client.http.filter.HttpFilterFactory;
 import com.passus.st.plugin.PluginConstants;
 
 /**
- *
  * @author Mirosław Hawrot
  */
-public class HttpClientWorkerFactory extends PluginFactory<HttpClientWorker>  {
+public class HttpClientWorkerFactory extends PluginFactory<HttpClientWorker> {
 
     private static HttpClientWorkerFactory instance;
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientX.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,97 @@
+package com.passus.st.client.http;
+
+import com.passus.commons.time.TimeAware;
+import com.passus.commons.time.TimeGenerator;
+import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
+import com.passus.net.http.HttpRequest;
+import com.passus.st.client.ClientX;
+import com.passus.st.client.ClientXDataDecoder;
+import com.passus.st.client.ClientXDataEncoder;
+import com.passus.st.client.FlowContext;
+import com.passus.st.metric.MetricsContainer;
+
+import static com.passus.st.Protocols.HTTP;
+import static com.passus.st.client.http.HttpConsts.TAG_TIME_END;
+import static com.passus.st.client.http.HttpConsts.TAG_TIME_START;
+
+public class HttpClientX implements ClientX, TimeAware {
+
+    private final HttpClientXDataDecoder decoder;
+
+    private final HttpClientXDataEncoder encoder;
+
+    TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
+
+    boolean collectMetrics = false;
+
+    HttpClientWorkerMetric metric;
+
+    public HttpClientX() {
+        decoder = new HttpClientXDataDecoder(this);
+        encoder = new HttpClientXDataEncoder();
+    }
+
+    @Override
+    public int getProtocolId() {
+        return HTTP;
+    }
+
+    @Override
+    public ClientXDataDecoder getResponseDecoder(FlowContext flowContext) {
+        return decoder;
+    }
+
+    @Override
+    public ClientXDataEncoder getRequestEncoder(FlowContext flowContext) {
+        return encoder;
+    }
+
+    @Override
+    public void init(FlowContext flowContext) {
+        //TODO Poprawic, w HttpScopes sa parametry globalne
+        flowContext.setParam(HttpFlowConst.PARAM_HTTP_CONTEXT, new HttpFlowContext(flowContext, new HttpScopes()));
+    }
+
+    @Override
+    public TimeGenerator getTimeGenerator() {
+        return timeGenerator;
+    }
+
+    @Override
+    public void setTimeGenerator(TimeGenerator timeGenerator) {
+        this.timeGenerator = timeGenerator;
+    }
+
+    @Override
+    public void configure(Configuration config, ConfigurationContext context) {
+
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        this.collectMetrics = collectMetrics;
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+
+    }
+
+    @Override
+    public void onDataWriteStart(FlowContext flowContext) {
+        long now = timeGenerator.currentTimeMillis();
+        ((HttpRequest) flowContext.sentEvent().getRequest()).setTag(TAG_TIME_START, now);
+    }
+
+    @Override
+    public void onDataWriteEnd(FlowContext flowContext) {
+        long now = timeGenerator.currentTimeMillis();
+        ((HttpRequest) (flowContext.sentEvent().getRequest())).setTag(TAG_TIME_END, now);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientXDataDecoder.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,102 @@
+package com.passus.st.client.http;
+
+import com.passus.data.ByteBuff;
+import com.passus.data.DataDecoder;
+import com.passus.net.http.HttpFullMessageDecoder;
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpResponse;
+import com.passus.st.client.ClientXDataDecoder;
+import com.passus.st.client.FlowContext;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import static com.passus.st.client.FlowUtils.debug;
+import static com.passus.st.client.http.HttpConsts.*;
+import static com.passus.st.client.http.filter.HttpFlowUtils.extractHttpContext;
+
+public class HttpClientXDataDecoder implements ClientXDataDecoder<HttpResponse> {
+
+    private static final Logger LOGGER = LogManager.getLogger(HttpClientXDataDecoder.class);
+
+    private final HttpFullMessageDecoder decoder;
+
+    private final HttpClientX client;
+
+    private HttpRequest lastRequest;
+
+    public HttpClientXDataDecoder(HttpClientX client) {
+        this.client = client;
+        decoder = new HttpFullMessageDecoder();
+        decoder.setDecodeRequest(false);
+    }
+
+    @Override
+    public HttpResponse getResult() {
+        return (HttpResponse) decoder.getResult();
+    }
+
+    @Override
+    public int state() {
+        return decoder.state();
+    }
+
+    @Override
+    public String getLastError() {
+        return decoder.getLastError();
+    }
+
+    @Override
+    public void clear(FlowContext flowContext) {
+        if (lastRequest != null) {
+            extractHttpContext(flowContext).scopes().removeConversation(lastRequest);
+            lastRequest = null;
+        }
+
+        decoder.clear();
+    }
+
+    @Override
+    public int decode(ByteBuff buffer, FlowContext flowContext) {
+        if (flowContext.sentEvent() != null) {
+            lastRequest = (HttpRequest) flowContext.sentEvent().getRequest();
+        }
+
+        if (lastRequest != null) {
+            decoder.setRequestMethod(lastRequest.getMethod());
+        }
+
+        int res = decoder.decode(buffer);
+        if (decoder.state() == DataDecoder.STATE_FINISHED) {
+            long now = client.timeGenerator.currentTimeMillis();
+            HttpResponse resp = (HttpResponse) decoder.getResult();
+
+            if (LOGGER.isDebugEnabled()) {
+                debug(LOGGER, flowContext,
+                        "Response decoded (size: {} B, downloaded: {} ms, status: {})",
+                        decoder.getHeaderSize() + decoder.getContentSize(),
+                        now - flowContext.receivedStartTimestamp(),
+                        resp.getStatus().getCode()
+                );
+            }
+
+            if (client.isCollectMetrics()) {
+                synchronized (client.metric) {
+                    client.metric.incResponsesNum();
+                    client.metric.addResponseStatusCode(resp.getStatus().getCode());
+                    client.metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize());
+                    client.metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp());
+                    if (lastRequest != null) {
+                        client.metric.addResponseTime(now - (long) lastRequest.getTag(TAG_TIME_START));
+                    }
+                }
+            }
+
+            resp.setTag(TAG_HEADER_SIZE, decoder.getHeaderSize());
+            resp.setTag(TAG_CONTENT_SIZE, decoder.getContentSize());
+            resp.setTag(TAG_TIME_START, flowContext.receivedStartTimestamp());
+            resp.setTag(TAG_TIME_END, now);
+        }
+
+        return res;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientXDataEncoder.java	Wed May 29 12:32:29 2019 +0200
@@ -0,0 +1,26 @@
+package com.passus.st.client.http;
+
+import com.passus.data.ByteBuff;
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpRequestEncoder;
+import com.passus.st.client.ClientXDataEncoder;
+import com.passus.st.client.FlowContext;
+
+import static com.passus.st.client.http.HttpConsts.TAG_CONTENT_SIZE;
+import static com.passus.st.client.http.HttpConsts.TAG_HEADER_SIZE;
+
+public class HttpClientXDataEncoder implements ClientXDataEncoder<HttpRequest> {
+
+    private final HttpRequestEncoder reqEncoder = new HttpRequestEncoder();
+
+    @Override
+    public void encode(HttpRequest request, FlowContext flowContext, ByteBuff out) {
+        reqEncoder.encodeHeaders(request, out);
+        long headerSize = out.readableBytes();
+        reqEncoder.encodeContent(request, out);
+
+        request.setTag(TAG_HEADER_SIZE, headerSize);
+        request.setTag(TAG_CONTENT_SIZE, flowContext.buffer().readableBytes() - headerSize);
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java	Wed May 29 12:32:29 2019 +0200
@@ -21,33 +21,23 @@
 import com.passus.st.metric.MetricsContainer;
 import org.apache.logging.log4j.Level;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static com.passus.st.client.FlowContext.*;
+import static com.passus.st.client.FlowUtils.DEFAULT_TIMEOUTS;
 import static com.passus.st.client.http.HttpConsts.*;
 import static com.passus.st.client.http.filter.HttpFlowUtils.extractHttpContext;
-import static com.passus.st.client.http.filter.HttpFlowUtils.createFlowContext;
 
 /**
  * @author Mirosław Hawrot
  */
+@Deprecated
 public abstract class HttpFlowBasedClientWorker extends HttpClientWorker implements TimeAware {
 
-    public static final Map<Integer, Long> DEFAULT_TIMEOUTS;
-
-    static {
-        Map<Integer, Long> defaultTimeouts = new HashMap<>();
-        defaultTimeouts.put(FlowContext.STATE_CONNECTING, 10_000L);
-        defaultTimeouts.put(FlowContext.STATE_CONNECTED, 60_000L);
-        defaultTimeouts.put(FlowContext.STATE_REQ_SENT, 30_000L);
-        defaultTimeouts.put(FlowContext.STATE_RESP_RECEIVED, 60_000L);
-        defaultTimeouts.put(FlowContext.STATE_ERROR, 60_000L);
-        defaultTimeouts.put(FlowContext.STATE_DISCONNECTING, 2_000L);
-        defaultTimeouts.put(STATE_DISCONNECTED, 0L);
-        DEFAULT_TIMEOUTS = Collections.unmodifiableMap(defaultTimeouts);
-    }
-
     protected final Map<SessionInfo, FlowContext> sessions = new ConcurrentHashMap<>();
 
     private final Set<SessionInfo> blockedSessions = new HashSet<>();
@@ -538,7 +528,7 @@
     }
 
     @Override
-    public void errorOccured(ChannelContext context, Throwable cause) throws Exception {
+    public void errorOccurred(ChannelContext context, Throwable cause) throws Exception {
         if (logger.isDebugEnabled()) {
             logger.debug("Error occured. " + cause.getMessage(), cause);
         }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowConst.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowConst.java	Wed May 29 12:32:29 2019 +0200
@@ -4,4 +4,6 @@
 
     public static final String PARAM_HTTP_CONTEXT = "http.context";
 
+    private HttpFlowConst() {
+    }
 }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowDispatcher.java	Fri May 24 12:01:51 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,9 +0,0 @@
-package com.passus.st.client.http;
-
-/**
- *
- * @author Mirosław Hawrot
- */
-public interface HttpFlowDispatcher {
-    
-}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowParams.java	Fri May 24 12:01:51 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,6 +0,0 @@
-package com.passus.st.client.http;
-
-public class HttpFlowParams {
-
-    private HttpScopes scopes;
-}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpNullClientWorker.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpNullClientWorker.java	Wed May 29 12:32:29 2019 +0200
@@ -9,6 +9,7 @@
  *
  * @author Mirosław Hawrot
  */
+@Deprecated
 public class HttpNullClientWorker extends HttpClientWorker {
 
     private boolean working = true;
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java	Wed May 29 12:32:29 2019 +0200
@@ -20,6 +20,7 @@
 /**
  * @author Mirosław Hawrot
  */
+@Deprecated
 @Plugin(name = HttpParallelClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER)
 public final class HttpParallelClientWorker extends HttpFlowBasedClientWorker {
 
@@ -35,7 +36,7 @@
 
     private final Semaphore semaphore = new Semaphore(maxSentRequests);
 
-    private final Deque<LocalHttpFlowContext> flowIndex = new ArrayDeque<>();
+    private final Deque<LocalFlowContext> flowIndex = new ArrayDeque<>();
 
     private boolean closeAllConnections = false;
 
@@ -53,17 +54,17 @@
     }
 
     @Override
-    protected LocalHttpFlowContext flowContext(SessionInfo session) {
-        return (LocalHttpFlowContext) super.flowContext(session);
+    protected LocalFlowContext flowContext(SessionInfo session) {
+        return (LocalFlowContext) super.flowContext(session);
     }
 
     @Override
-    protected LocalHttpFlowContext flowContext(ChannelContext context) {
+    protected LocalFlowContext flowContext(ChannelContext context) {
         return flowContext(context.getSessionInfo());
     }
 
     @Override
-    protected LocalHttpFlowContext flowContext(SessionEvent event) {
+    protected LocalFlowContext flowContext(SessionEvent event) {
         return flowContext(event.getSessionInfo());
     }
 
@@ -84,8 +85,8 @@
     }
 
     @Override
-    protected LocalHttpFlowContext createFlowContext(SessionInfo session) {
-        LocalHttpFlowContext flowContext = new LocalHttpFlowContext(session, scopes);
+    protected LocalFlowContext createFlowContext(SessionInfo session) {
+        LocalFlowContext flowContext = new LocalFlowContext(session, scopes);
         flowIndex.add(flowContext);
         return flowContext;
     }
@@ -114,9 +115,9 @@
                 debug(flowContext, "Too many connections {}.", flowIndex.size());
             }
 
-            Iterator<LocalHttpFlowContext> it = flowIndex.descendingIterator();
+            Iterator<LocalFlowContext> it = flowIndex.descendingIterator();
             while (it.hasNext()) {
-                LocalHttpFlowContext indexFlowContext = it.next();
+                LocalFlowContext indexFlowContext = it.next();
                 if (indexFlowContext.eventsQueue.isEmpty()
                         && indexFlowContext.state() != FlowContext.STATE_REQ_SENT) {
                     close(flowContext);
@@ -140,7 +141,7 @@
 
     @Override
     protected void flowStateChanged(FlowContext flowContext, int oldState) {
-        LocalHttpFlowContext localFlowContext = (LocalHttpFlowContext) flowContext;
+        LocalFlowContext localFlowContext = (LocalFlowContext) flowContext;
         if (oldState == FlowContext.STATE_REQ_SENT) {
             if (semaphore.availablePermits() <= maxSentRequests) {
                 semaphore.release();
@@ -178,14 +179,14 @@
         }
     }
 
-    private void makeFirst(LocalHttpFlowContext flowContext) {
+    private void makeFirst(LocalFlowContext flowContext) {
         synchronized (lock) {
             flowIndex.remove(flowContext);
             flowIndex.addFirst(flowContext);
         }
     }
 
-    private void addToQueue(LocalHttpFlowContext flowContext, Event event) {
+    private void addToQueue(LocalFlowContext flowContext, Event event) {
         flowContext.eventsQueue.add(event);
         makeFirst(flowContext);
     }
@@ -222,7 +223,7 @@
                 case SessionStatusEvent.TYPE:
                     SessionStatusEvent statusEvent = (SessionStatusEvent) event;
                     if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
-                        LocalHttpFlowContext flowContext = flowContext((SessionEvent) event);
+                        LocalFlowContext flowContext = flowContext((SessionEvent) event);
                         if (flowContext != null) {
                             if (flowContext.eventsQueue.isEmpty()
                                     && flowContext.state() != FlowContext.STATE_REQ_SENT) {
@@ -235,7 +236,7 @@
                     break;
                 case HttpSessionPayloadEvent.TYPE: {
                     HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event;
-                    LocalHttpFlowContext flowContext = flowContext(payloadEvent);
+                    LocalFlowContext flowContext = flowContext(payloadEvent);
                     if (flowContext != null) {
                         if (flowContext.state() >= FlowContext.STATE_CONNECTING
                                 && flowContext.state() < FlowContext.STATE_DISCONNECTING) {
@@ -251,7 +252,7 @@
                     } else {
                         try {
                             SessionInfo session = payloadEvent.getSessionInfo();
-                            flowContext = (LocalHttpFlowContext) register(session);
+                            flowContext = (LocalFlowContext) register(session);
                             addToQueue(flowContext, event);
                             emitter.connect(session, this, index);
                         } catch (Exception e) {
@@ -304,11 +305,11 @@
         }
     }
 
-    protected static class LocalHttpFlowContext extends FlowContext {
+    protected static class LocalFlowContext extends FlowContext {
 
         private final Queue<Event> eventsQueue;
 
-        private LocalHttpFlowContext(SessionInfo session, HttpScopes scopes) {
+        private LocalFlowContext(SessionInfo session, HttpScopes scopes) {
             super(session);
             setParam(HttpFlowConst.PARAM_HTTP_CONTEXT, new HttpFlowContext(this, scopes));
             eventsQueue = new LinkedList<>();
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java	Wed May 29 12:32:29 2019 +0200
@@ -3,32 +3,21 @@
 import com.passus.net.http.HttpMessageHelper;
 import com.passus.net.http.HttpRequest;
 import com.passus.net.http.HttpResponse;
-import com.passus.st.client.http.HttpReqResp;
 import com.passus.st.client.SessionPayloadEvent;
 import com.passus.st.emitter.SessionInfo;
 
+import static com.passus.st.Protocols.HTTP;
+
 /**
- *
  * @author Mirosław Hawrot
  */
-public class HttpSessionPayloadEvent extends SessionPayloadEvent<HttpReqResp> {
+@Deprecated
+public class HttpSessionPayloadEvent extends SessionPayloadEvent<HttpRequest, HttpResponse> {
 
     public static final int TYPE = 12;
 
     public HttpSessionPayloadEvent(SessionInfo sessionInfo, HttpRequest req, HttpResponse resp, String sourceName) {
-        this(sessionInfo, new HttpReqResp(req, resp), sourceName);
-    }
-
-    public HttpSessionPayloadEvent(SessionInfo sessionInfo, HttpReqResp payload, String sourceName) {
-        super(sessionInfo, payload, sourceName);
-    }
-
-    public HttpRequest getRequest() {
-        return getPayload().getRequest();
-    }
-
-    public HttpResponse getResponse() {
-        return getPayload().getResponse();
+        super(sessionInfo, req, resp, HTTP, sourceName);
     }
 
     @Override
@@ -37,13 +26,6 @@
     }
 
     @Override
-    public HttpSessionPayloadEvent instanceForWorker(int index) {
-        HttpReqResp payload = getPayload();
-        HttpRequest reqCopy = payload.request == null ? null : new HttpRequest(payload.request);
-        return new HttpSessionPayloadEvent(getSessionInfo(), reqCopy, payload.response, getSourceName());
-    }
-
-    @Override
     public String toString() {
         String req = null;
         String resp = null;
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpStresserWorker.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpStresserWorker.java	Wed May 29 12:32:29 2019 +0200
@@ -11,6 +11,7 @@
  *
  * @author Mirosław Hawrot
  */
+@Deprecated
 public class HttpStresserWorker extends HttpClientWorker {
 
     private int maxRequests = -1;
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java	Wed May 29 12:32:29 2019 +0200
@@ -20,6 +20,7 @@
 /**
  * @author Mirosław Hawrot
  */
+@Deprecated
 @Plugin(name = HttpSynchClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER)
 public class HttpSynchClientWorker extends HttpFlowBasedClientWorker {
 
--- a/stress-tester/src/main/java/com/passus/st/client/http/extractor/YamlExtractor.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/extractor/YamlExtractor.java	Wed May 29 12:32:29 2019 +0200
@@ -1,15 +1,14 @@
 package com.passus.st.client.http.extractor;
 
+import org.yaml.snakeyaml.Yaml;
+
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.yaml.snakeyaml.Yaml;
-
 /**
  * @author norbert.rostkowski
  */
@@ -40,7 +39,6 @@
         }
     }
 
-
     @Override
     public CharSequence extract(CharSequence content) throws IOException {
         Yaml yaml = new Yaml();
--- a/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpCsrfFormFilter.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpCsrfFormFilter.java	Wed May 29 12:32:29 2019 +0200
@@ -100,7 +100,7 @@
                             LOGGER.debug("Could not find request parameter {}", parameters);
                         }
                     } catch (IOException ex) {
-                        LOGGER.debug("Could not decode request.");
+                        LOGGER.debug("Could not decodeResponse request.");
                     }
                 }
             }
--- a/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpMessagePredicate.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpMessagePredicate.java	Wed May 29 12:32:29 2019 +0200
@@ -7,13 +7,12 @@
 import com.passus.net.http.HttpRequest;
 import com.passus.net.http.HttpResponse;
 import com.passus.st.client.FlowContext;
-import com.passus.st.client.http.HttpFlowContext;
 import com.passus.st.filter.Transformers;
+
 import java.io.IOException;
 import java.util.function.Predicate;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public class HttpMessagePredicate implements Predicate<HttpMessage> {
--- a/stress-tester/src/main/java/com/passus/st/emitter/EmitterHandler.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/EmitterHandler.java	Wed May 29 12:32:29 2019 +0200
@@ -3,43 +3,42 @@
 import com.passus.data.ByteBuff;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public interface EmitterHandler {
 
-    public default void channelRegistered(ChannelContext context) throws Exception {
-
-    }
-
-    public default void channelUnregistered(ChannelContext context) throws Exception {
-
-    }
-
-    public default void channelActive(ChannelContext context) throws Exception {
-
-    }
-
-    public default void channelInactive(ChannelContext context) throws Exception {
+    default void channelRegistered(ChannelContext context) throws Exception {
 
     }
 
-    public default void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
-    }
-
-    public default void dataWriteStart(ChannelContext context) {
+    default void channelUnregistered(ChannelContext context) throws Exception {
 
     }
 
-    public default void dataWritten(ChannelContext context) throws Exception {
+    default void channelActive(ChannelContext context) throws Exception {
 
     }
 
-    public default void sessionInvalidated(SessionInfo session) throws Exception {
+    default void channelInactive(ChannelContext context) throws Exception {
 
     }
 
-    public default void errorOccured(ChannelContext context, Throwable cause) throws Exception {
+    default void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
+    }
+
+    default void dataWriteStart(ChannelContext context) {
+
+    }
+
+    default void dataWritten(ChannelContext context) throws Exception {
+
+    }
+
+    default void sessionInvalidated(SessionInfo session) throws Exception {
+
+    }
+
+    default void errorOccurred(ChannelContext context, Throwable cause) throws Exception {
 
     }
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Wed May 29 12:32:29 2019 +0200
@@ -9,6 +9,8 @@
 import java.text.ParseException;
 import java.util.Objects;
 
+import static com.passus.st.Protocols.UNKNOWN;
+
 /**
  * @author Mirosław Hawrot
  */
@@ -20,6 +22,8 @@
 
     private final int transport;
 
+    private final int protocolId;
+
     private final IpAddress srcIp;
 
     private final int srcPort;
@@ -46,6 +50,10 @@
         this(srcSocket.getIp(), srcSocket.getPort(), dstSocket.getIp(), dstSocket.getPort(), transport);
     }
 
+    public SessionInfo(SocketAddress srcSocket, SocketAddress dstSocket, int transport, int protocolId) {
+        this(srcSocket.getIp(), srcSocket.getPort(), dstSocket.getIp(), dstSocket.getPort(), transport, protocolId);
+    }
+
     public SessionInfo(String srcIp, int srcPort, String dstIp, int dstPort) {
         this(IpAddress.parse(srcIp), srcPort, IpAddress.parse(dstIp), dstPort);
     }
@@ -54,13 +62,20 @@
         this(srcIp, srcPort, dstIp, dstPort, DEFAULT_TRANSPORT);
     }
 
+
     public SessionInfo(IpAddress srcIp, int srcPort, IpAddress dstIp, int dstPort, int transport) {
-        this(srcIp, srcPort, dstIp, dstPort, transport, UniqueIdGenerator.generate());
+        this(srcIp, srcPort, dstIp, dstPort, transport, UNKNOWN);
     }
 
-    public SessionInfo(IpAddress srcIp, int srcPort, IpAddress dstIp, int dstPort, int transport, String sessionId) {
+    public SessionInfo(IpAddress srcIp, int srcPort, IpAddress dstIp, int dstPort, int transport, int protocolId) {
+        this(srcIp, srcPort, dstIp, dstPort, transport, protocolId, UniqueIdGenerator.generate());
+    }
+
+    public SessionInfo(IpAddress srcIp, int srcPort, IpAddress dstIp, int dstPort,
+                       int transport, int protocolId, String sessionId) {
         this.sessionId = sessionId;
         this.transport = transport;
+        this.protocolId = protocolId;
         this.srcPort = srcPort;
         this.dstPort = dstPort;
         this.srcIp = srcIp;
@@ -81,6 +96,10 @@
         return transport;
     }
 
+    public int getProtocolId() {
+        return protocolId;
+    }
+
     public String getSessionId() {
         return sessionId;
     }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDefaultEmitterWorker.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDefaultEmitterWorker.java	Wed May 29 12:32:29 2019 +0200
@@ -317,7 +317,7 @@
         KeyContext keyContext = (KeyContext) key.attachment();
 
         try {
-            keyContext.handler.errorOccured(keyContext.channelContext, cause);
+            keyContext.handler.errorOccurred(keyContext.channelContext, cause);
         } catch (Exception e) {
             logger.debug(e.getMessage(), e);
         }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker2.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker2.java	Wed May 29 12:32:29 2019 +0200
@@ -428,7 +428,7 @@
         KeyContext keyContext = (KeyContext) key.attachment();
 
         try {
-            keyContext.handler.errorOccured(keyContext.channelContext, cause);
+            keyContext.handler.errorOccurred(keyContext.channelContext, cause);
         } catch (Exception e) {
             logger.debug(e.getMessage(), e);
         }
--- a/stress-tester/src/main/java/com/passus/st/job/TestJob.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/job/TestJob.java	Wed May 29 12:32:29 2019 +0200
@@ -205,12 +205,12 @@
             }
         });
 
-        Map<String, ValueExtractor> appVars = (Map<String, ValueExtractor>) context.get(
+        Map<String, ValueExtractor> appVars = context.get(
                 ConfigurationContextConsts.APP_VARS);
         if (appVars != null) {
             HttpVarsFilter httpVarsFilter = new HttpVarsFilter(appVars);
 
-            List<HttpFilter> filters = (List<HttpFilter>) context.get(
+            List<HttpFilter> filters = context.get(
                     ConfigurationContextConsts.HTTP_FILTERS);
             if (filters != null) {
                 filters.add(httpVarsFilter);
--- a/stress-tester/src/main/java/com/passus/st/metric/MetricSource.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/metric/MetricSource.java	Wed May 29 12:32:29 2019 +0200
@@ -1,15 +1,14 @@
 package com.passus.st.metric;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public interface MetricSource {
 
-    public boolean isCollectMetrics();
+    boolean isCollectMetrics();
 
-    public void setCollectMetrics(boolean collectMetrics);
+    void setCollectMetrics(boolean collectMetrics);
 
-    public void writeMetrics(MetricsContainer container);
+    void writeMetrics(MetricsContainer container);
 
 }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java	Wed May 29 12:32:29 2019 +0200
@@ -4,22 +4,16 @@
 import com.passus.data.ByteBuff;
 import com.passus.data.DataSource;
 import com.passus.data.HeapByteBuff;
-import com.passus.net.http.HttpConsts;
-import com.passus.net.http.HttpHeaderEntry;
-import com.passus.net.http.HttpHeaders;
-import com.passus.net.http.HttpMessage;
-import com.passus.net.http.HttpMethod;
-import com.passus.net.http.HttpRequest;
-import com.passus.net.http.HttpResponse;
-import com.passus.st.client.http.HttpReqResp;
+import com.passus.net.http.*;
 import com.passus.st.client.http.HttpSessionPayloadEvent;
 import com.passus.st.emitter.SessionInfo;
+
+import java.io.IOException;
+
 import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_REQUEST;
 import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_RESPONSE;
-import java.io.IOException;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public class HttpSessionPayloadEventDataWriter {
@@ -153,29 +147,11 @@
         return size;
     }
 
-    public void encodeFullMessage(long timestamp, SessionInfo session, HttpMessage msg, NcDataBlockWriter writer) throws IOException {
-        ByteBuff buffer = new HeapByteBuff();
-        long size = encodeMessage(msg, buffer);
-        DataSource content = msg.getContent();
-        if (content != null) {
-            size += content.available();
-            size += ncDataHelper.writeLongVLC(buffer, content.available());
-        }
 
-        writer.writeSessionPayloadHeader(timestamp, session, (byte) 1);
-        writer.writeSessionPayloadData(buffer);
-        if (content != null) {
-            writer.writeSessionPayloadData(content);
-        }
-
-        writer.closeSessionPayloadBlock();
-    }
-
-    public void encodeFullMessages(long timestamp, SessionInfo session, HttpReqResp messages, NcDataBlockWriter writer) throws IOException {
+    public void encodeFullMessages(long timestamp, SessionInfo session, HttpRequest req, HttpResponse resp, NcDataBlockWriter writer) throws IOException {
         ByteBuff reqBuffer = new HeapByteBuff();
         ByteBuff respBuffer = new HeapByteBuff();
         byte flags = 0;
-        HttpRequest req = messages.getRequest();
         DataSource reqContent = null;
         if (reqWriteMode.code() > HttpWriteMode.SKIP.code()
                 && req != null) {
@@ -195,7 +171,6 @@
             }
         }
 
-        HttpResponse resp = messages.getResponse();
         DataSource respContent = null;
         if (respWriteMode.code() > HttpWriteMode.SKIP.code()
                 && resp != null) {
@@ -233,7 +208,7 @@
     public void write(HttpSessionPayloadEvent event, NcDataBlockWriter writer) throws IOException {
         long time = event.getTimestamp();
         SessionInfo session = event.getSessionInfo();
-        encodeFullMessages(time, session, event.getPayload(), writer);
+        encodeFullMessages(time, session, event.getRequest(), event.getResponse(), writer);
     }
 
 }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/option/DefaultValueCoderResolver.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/option/DefaultValueCoderResolver.java	Wed May 29 12:32:29 2019 +0200
@@ -62,7 +62,7 @@
             return code;
         }
 
-        throw new IllegalArgumentException("Cannot encode object of type: " + cls.getSimpleName());
+        throw new IllegalArgumentException("Cannot encodeRequest object of type: " + cls.getSimpleName());
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Wed May 29 12:32:29 2019 +0200
@@ -10,14 +10,17 @@
 import com.passus.config.schema.NodeDefinition;
 import com.passus.config.schema.NodeDefinitionCreator;
 import com.passus.config.validation.LongValidator;
-import com.passus.data.*;
+import com.passus.data.ByteBuff;
+import com.passus.data.ByteBuffAllocator;
+import com.passus.data.ByteBuffDataSource;
+import com.passus.data.DefaultByteBuffAllocator;
 import com.passus.st.client.DataEvents;
 import com.passus.st.client.DataEvents.DataEnd;
 import com.passus.st.client.DataEvents.DataLoopEnd;
 import com.passus.st.client.EventHandler;
+import com.passus.st.client.SessionPayloadEvent;
 import com.passus.st.client.SessionStatusEvent;
 import com.passus.st.client.http.HttpReqResp;
-import com.passus.st.client.http.HttpSessionPayloadEvent;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.plugin.PluginConstants;
 import com.passus.st.reader.nc.HttpSessionPayloadEventDataReader;
@@ -31,6 +34,7 @@
 import java.io.IOException;
 
 import static com.passus.config.schema.ConfigurationSchemaBuilder.*;
+import static com.passus.st.Protocols.HTTP;
 
 /**
  * @author Mirosław Hawrot
@@ -237,7 +241,7 @@
                 sessionInfo.setSourceName(getName());
                 ByteBuff payload = readPayload(payloadBlock.data());
                 HttpReqResp messages = httpReader.decodeMessages(payload);
-                handler.handle(new HttpSessionPayloadEvent(sessionInfo, messages, getName()));
+                handler.handle(new SessionPayloadEvent(sessionInfo, messages.getRequest(), messages.getResponse(), HTTP, getName()));
                 break;
             default:
                 reader.read();
--- a/stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java	Wed May 29 12:32:29 2019 +0200
@@ -8,8 +8,8 @@
 import com.passus.net.session.SessionKey;
 import com.passus.st.client.Event;
 import com.passus.st.client.EventHandler;
+import com.passus.st.client.SessionPayloadEvent;
 import com.passus.st.client.SessionStatusEvent;
-import com.passus.st.client.http.HttpSessionPayloadEvent;
 import com.passus.st.emitter.SessionInfo;
 
 import java.util.HashMap;
@@ -17,6 +17,7 @@
 
 import static com.passus.net.session.TcpSessionProcessorConstants.STATUS_CLOSE;
 import static com.passus.net.session.TcpSessionProcessorConstants.STATUS_ESTABLISHED;
+import static com.passus.st.Protocols.HTTP;
 
 public class PcapHttpListener implements HttpSessionListener {
 
@@ -31,7 +32,7 @@
     private final PcapSessionEventSourceMetric metric;
 
     public PcapHttpListener(String sourceName, int maxSessionNum, EventHandler eventHandler,
-                             boolean collectMetric, PcapSessionEventSourceMetric metric) {
+                            boolean collectMetric, PcapSessionEventSourceMetric metric) {
         this.sourceName = sourceName;
         lastRequests = new HashMap<>(maxSessionNum);
         this.eventHandler = eventHandler;
@@ -43,10 +44,10 @@
         SessionInfo info = new SessionInfo(
                 context.getSrcIpAddr(), context.getSrcPort(),
                 context.getDstIpAddr(), context.getDstPort(),
-                context.getProtocol(), context.getId());
+                HTTP, context.getProtocol(), context.getId());
         info.setSourceName(sourceName);
 
-        Event event = new HttpSessionPayloadEvent(info, req, resp, sourceName);
+        Event event = new SessionPayloadEvent(info, req, resp, HTTP, sourceName);
         event.setTimestamp(timestamp);
         eventHandler.handle(event);
         if (collectMetric) {
@@ -84,7 +85,7 @@
         SessionInfo info = new SessionInfo(
                 context.getSrcIpAddr(), context.getSrcPort(),
                 context.getDstIpAddr(), context.getDstPort(),
-                context.getProtocol(), context.getId());
+                HTTP, context.getProtocol(), context.getId());
         info.setSourceName(sourceName);
 
         Event event = new SessionStatusEvent(info, sessionInfoStatus, sourceName);
--- a/stress-tester/src/test/java/com/passus/st/PcapScannerTest.java	Fri May 24 12:01:51 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,41 +0,0 @@
-package com.passus.st;
-
-import com.passus.net.http.HttpHeaders;
-import org.apache.logging.log4j.Level;
-import org.testng.annotations.Test;
-
-/**
- *
- * @author mikolaj.podbielski
- */
-public class PcapScannerTest {
-
-    public static void main(String[] args) {
-        String ST = "C:\\Users\\mikolaj.podbielski\\Desktop\\hg\\stress-tester\\stress-tester\\";
-        PcapScanner.main(ST + "ndiag.pcap");
-//        scanAll();
-    }
-
-    public static void scanAll() {
-        try {
-            Log4jConfigurationFactory.enableFactory(Level.INFO);
-            PcapScanner.Extractor extractor = (event) -> event.getResponse().getHeaders().get(HttpHeaders.SET_COOKIE);
-            PcapScanner scanner = new PcapScanner(extractor);
-            scanAll(scanner);
-            System.out.println("\n --== Scanned values:");
-            scanner.getValues().forEach(System.out::println);
-        } catch (Exception e) {
-            e.printStackTrace(System.out);
-        }
-    }
-
-    private static void scanAll(PcapScanner scanner) throws InterruptedException {
-        String ST = "C:\\Users\\mikolaj.podbielski\\Desktop\\hg\\stress-tester\\stress-tester\\";
-        scanner.scan(ST + "ndiag.pcap", 8080);
-        scanner.scan(ST + "netim.pcap", 9190);
-        scanner.scan(ST + "arx.pcap", 80);
-        scanner.scan(ST + "http5b1.pcap", 80);
-        scanner.scan(ST + "basic_digest.pcap", 80);
-    }
-
-}
--- a/stress-tester/src/test/java/com/passus/st/client/TestClientHandler.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/TestClientHandler.java	Wed May 29 12:32:29 2019 +0200
@@ -71,7 +71,7 @@
     }
 
     @Override
-    public final void errorOccured(ChannelContext context, Throwable cause) throws Exception {
+    public final void errorOccurred(ChannelContext context, Throwable cause) throws Exception {
         ClientEvent event = ClientEvent.create(EventType.ERROR_OCCURED, context);
         event.setCause(cause);
         add(event);
--- a/stress-tester/src/test/java/com/passus/st/utils/Assert.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/utils/Assert.java	Wed May 29 12:32:29 2019 +0200
@@ -4,23 +4,22 @@
 import com.passus.data.ByteStringImpl;
 import com.passus.data.SliceByteString;
 import com.passus.st.client.Event;
-import com.passus.st.client.TestHttpClientListener;
+import com.passus.st.client.SessionPayloadEvent;
 import com.passus.st.client.TestHttpClientListener.HttpClientEvent;
 import com.passus.st.client.TestHttpClientListener.HttpClientEventType;
 import com.passus.st.client.TestHttpClientListener.ResponseReceivedEvent;
-import com.passus.st.client.http.HttpSessionPayloadEvent;
-import com.passus.st.source.NcEventSourceTest;
-import static com.passus.st.utils.HttpMessageAssert.assertMessages;
-import static com.passus.st.utils.HttpMessageAssert.assertMessagesContent;
+import org.testng.AssertJUnit;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.stream.Collectors;
-import org.testng.AssertJUnit;
-import static org.testng.AssertJUnit.assertEquals;
+
+import static com.passus.st.Protocols.HTTP;
+import static com.passus.st.utils.HttpMessageAssert.assertMessages;
+import static com.passus.st.utils.HttpMessageAssert.assertMessagesContent;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public class Assert extends AssertJUnit {
@@ -42,14 +41,14 @@
 
     public static void assertHttpClientEvents(List<Event> expectedEvents, Collection<HttpClientEvent> httpClientEvents) {
         expectedEvents = expectedEvents.stream()
-                .filter((e) -> e.getType() == HttpSessionPayloadEvent.TYPE)
+                .filter((e) -> e.getType() == SessionPayloadEvent.TYPE && ((SessionPayloadEvent) e).getProtocolId() == HTTP)
                 .collect(Collectors.toList());
 
         List<Event> events = new ArrayList<>(httpClientEvents.size());
         httpClientEvents.forEach((event) -> {
             if (event.getType() == HttpClientEventType.RESPONSE_RECEIVED) {
                 ResponseReceivedEvent respReceived = (ResponseReceivedEvent) event;
-                HttpSessionPayloadEvent payloadEvent = new HttpSessionPayloadEvent(null, respReceived.getRequest(), respReceived.getResponse(), null);
+                SessionPayloadEvent payloadEvent = new SessionPayloadEvent(null, respReceived.getRequest(), respReceived.getResponse(), HTTP, null);
                 events.add(payloadEvent);
             }
         });
@@ -64,9 +63,9 @@
             Event event = events.get(i);
 
             assertEquals(expectedEvent.getType(), event.getType());
-            if (event.getType() == HttpSessionPayloadEvent.TYPE) {
-                HttpSessionPayloadEvent expectedPayloadEvent = (HttpSessionPayloadEvent) expectedEvent;
-                HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event;
+            if (event.getType() == SessionPayloadEvent.TYPE && ((SessionPayloadEvent) event).getProtocolId() == HTTP) {
+                SessionPayloadEvent expectedPayloadEvent = (SessionPayloadEvent) expectedEvent;
+                SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event;
 
                 assertMessages(expectedPayloadEvent.getRequest(), payloadEvent.getRequest());
                 assertMessagesContent(expectedPayloadEvent.getRequest(), payloadEvent.getRequest());
--- a/stress-tester/src/test/java/com/passus/st/utils/HttpMessageAssert.java	Fri May 24 12:01:51 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/utils/HttpMessageAssert.java	Wed May 29 12:32:29 2019 +0200
@@ -1,6 +1,5 @@
 package com.passus.st.utils;
 
-import static com.passus.st.utils.Assert.*;
 import com.passus.data.ByteBuff;
 import com.passus.data.ByteString;
 import com.passus.data.DataSourceUtils;
@@ -8,11 +7,14 @@
 import com.passus.net.http.HttpMessage;
 import com.passus.net.http.HttpRequest;
 import com.passus.net.http.HttpResponse;
+
 import java.util.List;
 import java.util.Set;
 
+import static com.passus.st.utils.Assert.assertEquals;
+import static com.passus.st.utils.Assert.fail;
+
 /**
- *
  * @author Mirosław Hawrot
  */
 public class HttpMessageAssert {
@@ -20,6 +22,10 @@
     private HttpMessageAssert() {
     }
 
+    public static void assertMessages(Object expectedMsg, Object msg) {
+        assertMessages((HttpMessage) expectedMsg, (HttpMessage) msg);
+    }
+
     public static void assertMessages(HttpMessage expectedMsg, HttpMessage msg) {
         if (expectedMsg == msg) {
             return;
@@ -33,6 +39,10 @@
         }
     }
 
+    public static void assertMessagesContent(Object expectedMsg, Object msg) {
+        assertMessagesContent((HttpMessage) expectedMsg, (HttpMessage) msg);
+    }
+
     public static void assertMessagesContent(HttpMessage expectedMsg, HttpMessage msg) {
         if (expectedMsg == msg) {
             return;