changeset 963:3a4c9ca6593a

Refactorization in progress
author Devel 2
date Fri, 31 May 2019 10:53:30 +0200
parents e85498215df0
children d1e54d1d3e1e
files stress-tester/src/main/java/com/passus/st/client/ClientX.java stress-tester/src/main/java/com/passus/st/client/ClientXDataDecoder.java stress-tester/src/main/java/com/passus/st/client/ClientXDataEncoder.java stress-tester/src/main/java/com/passus/st/client/ClientXFactory.java stress-tester/src/main/java/com/passus/st/client/ClientXFactoryImpl.java stress-tester/src/main/java/com/passus/st/client/Flow.java stress-tester/src/main/java/com/passus/st/client/FlowContext.java stress-tester/src/main/java/com/passus/st/client/FlowHandler.java stress-tester/src/main/java/com/passus/st/client/FlowHandlerDataDecoder.java stress-tester/src/main/java/com/passus/st/client/FlowHandlerDataEncoder.java stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactory.java stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java stress-tester/src/main/java/com/passus/st/client/FlowWorker.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java stress-tester/src/main/java/com/passus/st/client/http/HttpClientX.java stress-tester/src/main/java/com/passus/st/client/http/HttpClientXDataDecoder.java stress-tester/src/main/java/com/passus/st/client/http/HttpClientXDataEncoder.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandlerDataDecoder.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandlerDataEncoder.java
diffstat 20 files changed, 319 insertions(+), 315 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/ClientX.java	Fri May 31 09:27:03 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,24 +0,0 @@
-package com.passus.st.client;
-
-import com.passus.config.Configurable;
-import com.passus.st.metric.MetricSource;
-
-public interface ClientX extends MetricSource, Configurable {
-
-    int getProtocolId();
-
-    void init(FlowContext flowContext);
-
-    ClientXDataDecoder getResponseDecoder(FlowContext flowContext);
-
-    ClientXDataEncoder getRequestEncoder(FlowContext flowContext);
-
-    default void onDataWriteStart(FlowContext flowContext) {
-
-    }
-
-    default void onDataWriteEnd(FlowContext flowContext) {
-
-    }
-
-}
--- a/stress-tester/src/main/java/com/passus/st/client/ClientXDataDecoder.java	Fri May 31 09:27:03 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,19 +0,0 @@
-package com.passus.st.client;
-
-import com.passus.data.ByteBuff;
-
-public interface ClientXDataDecoder<T> {
-
-    T getResult();
-
-    int state();
-
-    String getLastError();
-
-    default void clear(FlowContext flowContext) {
-
-    }
-    
-    int decode(ByteBuff buffer, FlowContext flowContext);
-
-}
--- a/stress-tester/src/main/java/com/passus/st/client/ClientXDataEncoder.java	Fri May 31 09:27:03 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,9 +0,0 @@
-package com.passus.st.client;
-
-import com.passus.data.ByteBuff;
-
-public interface ClientXDataEncoder<T> {
-
-    void encode(T request, FlowContext flowContext, ByteBuff out);
-
-}
--- a/stress-tester/src/main/java/com/passus/st/client/ClientXFactory.java	Fri May 31 09:27:03 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,7 +0,0 @@
-package com.passus.st.client;
-
-public interface ClientXFactory {
-
-    ClientX create(int protocolId);
-
-}
--- a/stress-tester/src/main/java/com/passus/st/client/ClientXFactoryImpl.java	Fri May 31 09:27:03 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,18 +0,0 @@
-package com.passus.st.client;
-
-import com.passus.st.client.http.HttpClientX;
-
-import static com.passus.st.Protocols.HTTP;
-
-public class ClientXFactoryImpl implements ClientXFactory {
-
-    @Override
-    public ClientX create(int protocolId) {
-        switch (protocolId) {
-            case HTTP:
-                return new HttpClientX();
-        }
-
-        throw new IllegalArgumentException("Not supported protocol '" + protocolId + "'.");
-    }
-}
--- a/stress-tester/src/main/java/com/passus/st/client/Flow.java	Fri May 31 09:27:03 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/Flow.java	Fri May 31 10:53:30 2019 +0200
@@ -9,9 +9,9 @@
 
 public interface Flow extends EventHandler, MetricSource, Configurable {
 
-    ClientX getClient();
+    FlowHandler getClient();
 
-    void setClient(ClientX client);
+    void setClient(FlowHandler client);
 
     Emitter getEmitter();
 
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Fri May 31 09:27:03 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Fri May 31 10:53:30 2019 +0200
@@ -41,7 +41,7 @@
 
     private int loop;
 
-    private ClientX client;
+    private FlowHandler client;
 
     @Deprecated
     protected DataDecoder decoder;
@@ -60,11 +60,11 @@
         this.channelContext = channelContext;
     }
 
-    public ClientX client() {
+    public FlowHandler client() {
         return client;
     }
 
-    public void client(ClientX client) {
+    public void client(FlowHandler client) {
         this.client = client;
     }
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandler.java	Fri May 31 10:53:30 2019 +0200
@@ -0,0 +1,35 @@
+package com.passus.st.client;
+
+import com.passus.config.Configurable;
+import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
+import com.passus.st.metric.MetricSource;
+
+/**
+ * TODO Dolecowo ma zastapic interfejs Client
+ */
+public interface FlowHandler extends MetricSource, Configurable {
+
+    boolean DEFAULT_COLLECT_METRICS = true;
+
+    int getProtocolId();
+
+    void init(FlowContext flowContext);
+
+    FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext);
+
+    FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext);
+
+    default void onDataWriteStart(FlowContext flowContext) {
+
+    }
+
+    default void onDataWriteEnd(FlowContext flowContext) {
+
+    }
+
+    @Override
+    default void configure(Configuration config, ConfigurationContext context) {
+        setCollectMetrics(config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS));
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerDataDecoder.java	Fri May 31 10:53:30 2019 +0200
@@ -0,0 +1,19 @@
+package com.passus.st.client;
+
+import com.passus.data.ByteBuff;
+
+public interface FlowHandlerDataDecoder<T> {
+
+    T getResult();
+
+    int state();
+
+    String getLastError();
+
+    default void clear(FlowContext flowContext) {
+
+    }
+    
+    int decode(ByteBuff buffer, FlowContext flowContext);
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerDataEncoder.java	Fri May 31 10:53:30 2019 +0200
@@ -0,0 +1,9 @@
+package com.passus.st.client;
+
+import com.passus.data.ByteBuff;
+
+public interface FlowHandlerDataEncoder<T> {
+
+    void encode(T request, FlowContext flowContext, ByteBuff out);
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactory.java	Fri May 31 10:53:30 2019 +0200
@@ -0,0 +1,7 @@
+package com.passus.st.client;
+
+public interface FlowHandlerFactory {
+
+    FlowHandler create(int protocolId);
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java	Fri May 31 10:53:30 2019 +0200
@@ -0,0 +1,18 @@
+package com.passus.st.client;
+
+import com.passus.st.client.http.HttpFlowHandler;
+
+import static com.passus.st.Protocols.HTTP;
+
+public class FlowHandlerFactoryImpl implements FlowHandlerFactory {
+
+    @Override
+    public FlowHandler create(int protocolId) {
+        switch (protocolId) {
+            case HTTP:
+                return new HttpFlowHandler();
+        }
+
+        throw new IllegalArgumentException("Not supported protocol '" + protocolId + "'.");
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java	Fri May 31 09:27:03 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java	Fri May 31 10:53:30 2019 +0200
@@ -25,7 +25,7 @@
 
     protected final Emitter emitter;
 
-    protected ClientXFactory clientFactory = new ClientXFactoryImpl();
+    protected FlowHandlerFactory clientFactory = new FlowHandlerFactoryImpl();
 
     protected boolean collectMetric;
 
@@ -65,11 +65,11 @@
         this.filterChain = filterChain;
     }
 
-    public ClientXFactory getClientFactory() {
+    public FlowHandlerFactory getClientFactory() {
         return clientFactory;
     }
 
-    public void setClientFactory(ClientXFactory clientFactory) {
+    public void setClientFactory(FlowHandlerFactory clientFactory) {
         Assert.notNull(clientFactory, "clientFactory");
         this.clientFactory = clientFactory;
     }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java	Fri May 31 09:27:03 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java	Fri May 31 10:53:30 2019 +0200
@@ -194,7 +194,7 @@
 
             FlowContext flowContext = createFlowContext(session);
             //TODO Malo optymalne
-            ClientX client = clientFactory.create(session.getProtocolId());
+            FlowHandler client = clientFactory.create(session.getProtocolId());
             client.init(flowContext);
             flowContext.client(client);
             sessions.put(session, flowContext);
@@ -381,8 +381,8 @@
             FlowContext flowContext = flowContext(context);
             try {
                 if (flowContext != null) {
-                    ClientX client = flowContext.client();
-                    ClientXDataDecoder decoder = client.getResponseDecoder(flowContext);
+                    FlowHandler client = flowContext.client();
+                    FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext);
                     decoder.decode(data, flowContext);
                     long now = timeGenerator.currentTimeMillis();
                     if (flowContext.receivedStartTimestamp() == -1) {
@@ -481,8 +481,8 @@
                     return false;
                 }
 
-                ClientX client = flowContext.client();
-                ClientXDataEncoder encoder = client.getRequestEncoder(flowContext);
+                FlowHandler client = flowContext.client();
+                FlowHandlerDataEncoder encoder = client.getRequestEncoder(flowContext);
                 ByteBuff buffer = flowContext.buffer();
                 encoder.encode(req, flowContext, buffer);
 
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientX.java	Fri May 31 09:27:03 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,97 +0,0 @@
-package com.passus.st.client.http;
-
-import com.passus.commons.time.TimeAware;
-import com.passus.commons.time.TimeGenerator;
-import com.passus.config.Configuration;
-import com.passus.config.ConfigurationContext;
-import com.passus.net.http.HttpRequest;
-import com.passus.st.client.ClientX;
-import com.passus.st.client.ClientXDataDecoder;
-import com.passus.st.client.ClientXDataEncoder;
-import com.passus.st.client.FlowContext;
-import com.passus.st.metric.MetricsContainer;
-
-import static com.passus.st.Protocols.HTTP;
-import static com.passus.st.client.http.HttpConsts.TAG_TIME_END;
-import static com.passus.st.client.http.HttpConsts.TAG_TIME_START;
-
-public class HttpClientX implements ClientX, TimeAware {
-
-    private final HttpClientXDataDecoder decoder;
-
-    private final HttpClientXDataEncoder encoder;
-
-    TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
-
-    boolean collectMetrics = false;
-
-    HttpClientWorkerMetric metric;
-
-    public HttpClientX() {
-        decoder = new HttpClientXDataDecoder(this);
-        encoder = new HttpClientXDataEncoder();
-    }
-
-    @Override
-    public int getProtocolId() {
-        return HTTP;
-    }
-
-    @Override
-    public ClientXDataDecoder getResponseDecoder(FlowContext flowContext) {
-        return decoder;
-    }
-
-    @Override
-    public ClientXDataEncoder getRequestEncoder(FlowContext flowContext) {
-        return encoder;
-    }
-
-    @Override
-    public void init(FlowContext flowContext) {
-        //TODO Poprawic, w HttpScopes sa parametry globalne
-        flowContext.setParam(HttpFlowConst.PARAM_HTTP_CONTEXT, new HttpFlowContext(flowContext, new HttpScopes()));
-    }
-
-    @Override
-    public TimeGenerator getTimeGenerator() {
-        return timeGenerator;
-    }
-
-    @Override
-    public void setTimeGenerator(TimeGenerator timeGenerator) {
-        this.timeGenerator = timeGenerator;
-    }
-
-    @Override
-    public void configure(Configuration config, ConfigurationContext context) {
-
-    }
-
-    @Override
-    public boolean isCollectMetrics() {
-        return collectMetrics;
-    }
-
-    @Override
-    public void setCollectMetrics(boolean collectMetrics) {
-        this.collectMetrics = collectMetrics;
-    }
-
-    @Override
-    public void writeMetrics(MetricsContainer container) {
-
-    }
-
-    @Override
-    public void onDataWriteStart(FlowContext flowContext) {
-        long now = timeGenerator.currentTimeMillis();
-        ((HttpRequest) flowContext.sentEvent().getRequest()).setTag(TAG_TIME_START, now);
-    }
-
-    @Override
-    public void onDataWriteEnd(FlowContext flowContext) {
-        long now = timeGenerator.currentTimeMillis();
-        ((HttpRequest) (flowContext.sentEvent().getRequest())).setTag(TAG_TIME_END, now);
-    }
-}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientXDataDecoder.java	Fri May 31 09:27:03 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,102 +0,0 @@
-package com.passus.st.client.http;
-
-import com.passus.data.ByteBuff;
-import com.passus.data.DataDecoder;
-import com.passus.net.http.HttpFullMessageDecoder;
-import com.passus.net.http.HttpRequest;
-import com.passus.net.http.HttpResponse;
-import com.passus.st.client.ClientXDataDecoder;
-import com.passus.st.client.FlowContext;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import static com.passus.st.client.FlowUtils.debug;
-import static com.passus.st.client.http.HttpConsts.*;
-import static com.passus.st.client.http.filter.HttpFlowUtils.extractHttpContext;
-
-public class HttpClientXDataDecoder implements ClientXDataDecoder<HttpResponse> {
-
-    private static final Logger LOGGER = LogManager.getLogger(HttpClientXDataDecoder.class);
-
-    private final HttpFullMessageDecoder decoder;
-
-    private final HttpClientX client;
-
-    private HttpRequest lastRequest;
-
-    public HttpClientXDataDecoder(HttpClientX client) {
-        this.client = client;
-        decoder = new HttpFullMessageDecoder();
-        decoder.setDecodeRequest(false);
-    }
-
-    @Override
-    public HttpResponse getResult() {
-        return (HttpResponse) decoder.getResult();
-    }
-
-    @Override
-    public int state() {
-        return decoder.state();
-    }
-
-    @Override
-    public String getLastError() {
-        return decoder.getLastError();
-    }
-
-    @Override
-    public void clear(FlowContext flowContext) {
-        if (lastRequest != null) {
-            extractHttpContext(flowContext).scopes().removeConversation(lastRequest);
-            lastRequest = null;
-        }
-
-        decoder.clear();
-    }
-
-    @Override
-    public int decode(ByteBuff buffer, FlowContext flowContext) {
-        if (flowContext.sentEvent() != null) {
-            lastRequest = (HttpRequest) flowContext.sentEvent().getRequest();
-        }
-
-        if (lastRequest != null) {
-            decoder.setRequestMethod(lastRequest.getMethod());
-        }
-
-        int res = decoder.decode(buffer);
-        if (decoder.state() == DataDecoder.STATE_FINISHED) {
-            long now = client.timeGenerator.currentTimeMillis();
-            HttpResponse resp = (HttpResponse) decoder.getResult();
-
-            if (LOGGER.isDebugEnabled()) {
-                debug(LOGGER, flowContext,
-                        "Response decoded (size: {} B, downloaded: {} ms, status: {})",
-                        decoder.getHeaderSize() + decoder.getContentSize(),
-                        now - flowContext.receivedStartTimestamp(),
-                        resp.getStatus().getCode()
-                );
-            }
-
-            if (client.isCollectMetrics()) {
-                synchronized (client.metric) {
-                    client.metric.incResponsesNum();
-                    client.metric.addResponseStatusCode(resp.getStatus().getCode());
-                    client.metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize());
-                    client.metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp());
-                    if (lastRequest != null) {
-                        client.metric.addResponseTime(now - (long) lastRequest.getTag(TAG_TIME_START));
-                    }
-                }
-            }
-
-            resp.setTag(TAG_HEADER_SIZE, decoder.getHeaderSize());
-            resp.setTag(TAG_CONTENT_SIZE, decoder.getContentSize());
-            resp.setTag(TAG_TIME_START, flowContext.receivedStartTimestamp());
-            resp.setTag(TAG_TIME_END, now);
-        }
-
-        return res;
-    }
-}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientXDataEncoder.java	Fri May 31 09:27:03 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,26 +0,0 @@
-package com.passus.st.client.http;
-
-import com.passus.data.ByteBuff;
-import com.passus.net.http.HttpRequest;
-import com.passus.net.http.HttpRequestEncoder;
-import com.passus.st.client.ClientXDataEncoder;
-import com.passus.st.client.FlowContext;
-
-import static com.passus.st.client.http.HttpConsts.TAG_CONTENT_SIZE;
-import static com.passus.st.client.http.HttpConsts.TAG_HEADER_SIZE;
-
-public class HttpClientXDataEncoder implements ClientXDataEncoder<HttpRequest> {
-
-    private final HttpRequestEncoder reqEncoder = new HttpRequestEncoder();
-
-    @Override
-    public void encode(HttpRequest request, FlowContext flowContext, ByteBuff out) {
-        reqEncoder.encodeHeaders(request, out);
-        long headerSize = out.readableBytes();
-        reqEncoder.encodeContent(request, out);
-
-        request.setTag(TAG_HEADER_SIZE, headerSize);
-        request.setTag(TAG_CONTENT_SIZE, flowContext.buffer().readableBytes() - headerSize);
-    }
-
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java	Fri May 31 10:53:30 2019 +0200
@@ -0,0 +1,90 @@
+package com.passus.st.client.http;
+
+import com.passus.commons.time.TimeAware;
+import com.passus.commons.time.TimeGenerator;
+import com.passus.net.http.HttpRequest;
+import com.passus.st.client.FlowHandler;
+import com.passus.st.client.FlowHandlerDataDecoder;
+import com.passus.st.client.FlowHandlerDataEncoder;
+import com.passus.st.client.FlowContext;
+import com.passus.st.metric.MetricsContainer;
+
+import static com.passus.st.Protocols.HTTP;
+import static com.passus.st.client.http.HttpConsts.TAG_TIME_END;
+import static com.passus.st.client.http.HttpConsts.TAG_TIME_START;
+
+public class HttpFlowHandler implements FlowHandler, TimeAware {
+
+    private final HttpFlowHandlerDataDecoder decoder;
+
+    private final HttpFlowHandlerDataEncoder encoder;
+
+    TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
+
+    boolean collectMetrics = false;
+
+    HttpClientWorkerMetric metric;
+
+    public HttpFlowHandler() {
+        decoder = new HttpFlowHandlerDataDecoder(this);
+        encoder = new HttpFlowHandlerDataEncoder();
+    }
+
+    @Override
+    public int getProtocolId() {
+        return HTTP;
+    }
+
+    @Override
+    public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) {
+        return decoder;
+    }
+
+    @Override
+    public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) {
+        return encoder;
+    }
+
+    @Override
+    public void init(FlowContext flowContext) {
+        //TODO Poprawic, w HttpScopes sa parametry globalne
+        flowContext.setParam(HttpFlowConst.PARAM_HTTP_CONTEXT, new HttpFlowContext(flowContext, new HttpScopes()));
+    }
+
+    @Override
+    public TimeGenerator getTimeGenerator() {
+        return timeGenerator;
+    }
+
+    @Override
+    public void setTimeGenerator(TimeGenerator timeGenerator) {
+        this.timeGenerator = timeGenerator;
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        this.collectMetrics = collectMetrics;
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+
+    }
+
+    @Override
+    public void onDataWriteStart(FlowContext flowContext) {
+        long now = timeGenerator.currentTimeMillis();
+        ((HttpRequest) flowContext.sentEvent().getRequest()).setTag(TAG_TIME_START, now);
+    }
+
+    @Override
+    public void onDataWriteEnd(FlowContext flowContext) {
+        long now = timeGenerator.currentTimeMillis();
+        ((HttpRequest) (flowContext.sentEvent().getRequest())).setTag(TAG_TIME_END, now);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandlerDataDecoder.java	Fri May 31 10:53:30 2019 +0200
@@ -0,0 +1,102 @@
+package com.passus.st.client.http;
+
+import com.passus.data.ByteBuff;
+import com.passus.data.DataDecoder;
+import com.passus.net.http.HttpFullMessageDecoder;
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpResponse;
+import com.passus.st.client.FlowHandlerDataDecoder;
+import com.passus.st.client.FlowContext;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import static com.passus.st.client.FlowUtils.debug;
+import static com.passus.st.client.http.HttpConsts.*;
+import static com.passus.st.client.http.filter.HttpFlowUtils.extractHttpContext;
+
+public class HttpFlowHandlerDataDecoder implements FlowHandlerDataDecoder<HttpResponse> {
+
+    private static final Logger LOGGER = LogManager.getLogger(HttpFlowHandlerDataDecoder.class);
+
+    private final HttpFullMessageDecoder decoder;
+
+    private final HttpFlowHandler client;
+
+    private HttpRequest lastRequest;
+
+    public HttpFlowHandlerDataDecoder(HttpFlowHandler client) {
+        this.client = client;
+        decoder = new HttpFullMessageDecoder();
+        decoder.setDecodeRequest(false);
+    }
+
+    @Override
+    public HttpResponse getResult() {
+        return (HttpResponse) decoder.getResult();
+    }
+
+    @Override
+    public int state() {
+        return decoder.state();
+    }
+
+    @Override
+    public String getLastError() {
+        return decoder.getLastError();
+    }
+
+    @Override
+    public void clear(FlowContext flowContext) {
+        if (lastRequest != null) {
+            extractHttpContext(flowContext).scopes().removeConversation(lastRequest);
+            lastRequest = null;
+        }
+
+        decoder.clear();
+    }
+
+    @Override
+    public int decode(ByteBuff buffer, FlowContext flowContext) {
+        if (flowContext.sentEvent() != null) {
+            lastRequest = (HttpRequest) flowContext.sentEvent().getRequest();
+        }
+
+        if (lastRequest != null) {
+            decoder.setRequestMethod(lastRequest.getMethod());
+        }
+
+        int res = decoder.decode(buffer);
+        if (decoder.state() == DataDecoder.STATE_FINISHED) {
+            long now = client.timeGenerator.currentTimeMillis();
+            HttpResponse resp = (HttpResponse) decoder.getResult();
+
+            if (LOGGER.isDebugEnabled()) {
+                debug(LOGGER, flowContext,
+                        "Response decoded (size: {} B, downloaded: {} ms, status: {})",
+                        decoder.getHeaderSize() + decoder.getContentSize(),
+                        now - flowContext.receivedStartTimestamp(),
+                        resp.getStatus().getCode()
+                );
+            }
+
+            if (client.isCollectMetrics()) {
+                synchronized (client.metric) {
+                    client.metric.incResponsesNum();
+                    client.metric.addResponseStatusCode(resp.getStatus().getCode());
+                    client.metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize());
+                    client.metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp());
+                    if (lastRequest != null) {
+                        client.metric.addResponseTime(now - (long) lastRequest.getTag(TAG_TIME_START));
+                    }
+                }
+            }
+
+            resp.setTag(TAG_HEADER_SIZE, decoder.getHeaderSize());
+            resp.setTag(TAG_CONTENT_SIZE, decoder.getContentSize());
+            resp.setTag(TAG_TIME_START, flowContext.receivedStartTimestamp());
+            resp.setTag(TAG_TIME_END, now);
+        }
+
+        return res;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandlerDataEncoder.java	Fri May 31 10:53:30 2019 +0200
@@ -0,0 +1,26 @@
+package com.passus.st.client.http;
+
+import com.passus.data.ByteBuff;
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpRequestEncoder;
+import com.passus.st.client.FlowHandlerDataEncoder;
+import com.passus.st.client.FlowContext;
+
+import static com.passus.st.client.http.HttpConsts.TAG_CONTENT_SIZE;
+import static com.passus.st.client.http.HttpConsts.TAG_HEADER_SIZE;
+
+public class HttpFlowHandlerDataEncoder implements FlowHandlerDataEncoder<HttpRequest> {
+
+    private final HttpRequestEncoder reqEncoder = new HttpRequestEncoder();
+
+    @Override
+    public void encode(HttpRequest request, FlowContext flowContext, ByteBuff out) {
+        reqEncoder.encodeHeaders(request, out);
+        long headerSize = out.readableBytes();
+        reqEncoder.encodeContent(request, out);
+
+        request.setTag(TAG_HEADER_SIZE, headerSize);
+        request.setTag(TAG_CONTENT_SIZE, flowContext.buffer().readableBytes() - headerSize);
+    }
+
+}