changeset 1008:469c65b34298

FlowWorkerBase refactorization in progress
author Devel 2
date Fri, 20 Mar 2020 14:49:19 +0100
parents bacaa71e4d16
children 654309b16935
files stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/FlowContext.java stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java stress-tester/src/main/java/com/passus/st/client/FlowUtils.java stress-tester/src/main/java/com/passus/st/client/FlowWorker.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/client/NullFlowWorker.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java stress-tester/src/test/java/com/passus/st/utils/TriFunction.java
diffstat 15 files changed, 442 insertions(+), 116 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java	Wed Mar 18 13:29:01 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java	Fri Mar 20 14:49:19 2020 +0100
@@ -148,23 +148,6 @@
     }
 
     @Override
-    public void sessionInvalidated(SessionInfo session) throws Exception {
-        synchronized (lock) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Session {} invalidated.", session);
-            }
-
-            FlowContext flowContext = flowContext(session);
-            if (flowContext != null) {
-                changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING);
-            }
-
-            addBlockedSession(session);
-            lock.notifyAll();
-        }
-    }
-
-    @Override
     protected void flowStateChanged(FlowContext context, int oldState) {
         synchronized (lock) {
             flowStateChanged = true;
@@ -251,7 +234,7 @@
                     FlowContext flowContext = flowContext(statusEvent);
                     if (flowContext != null) {
                         if (flowContext.state() != FlowContext.STATE_REQ_SENT) {
-                            close(statusEvent);
+                            disconnect(statusEvent);
                         }
                     }
                 }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Wed Mar 18 13:29:01 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Fri Mar 20 14:49:19 2020 +0100
@@ -25,6 +25,8 @@
 
     protected final SessionInfo session;
 
+    protected boolean blocked;
+
     protected ByteBuff buffer;
 
     protected ChannelContext channelContext;
@@ -45,6 +47,14 @@
 
     private boolean bidirectional = true;
 
+    protected int connectionAttempts;
+
+    protected int encoderErrors;
+
+    protected int decoderErrors;
+
+    protected int sendErrors;
+
     @Deprecated
     protected DataDecoder decoder;
 
@@ -155,6 +165,10 @@
         return session;
     }
 
+    public boolean isBlocked() {
+        return blocked;
+    }
+
     public Object getParam(String name) {
         if (params == null) {
             return null;
--- a/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java	Wed Mar 18 13:29:01 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java	Fri Mar 20 14:49:19 2020 +0100
@@ -269,7 +269,7 @@
         }
 
         for (FlowWorker worker : workers) {
-            worker.close();
+            worker.disconnect();
             worker.interrupt();
 
             try {
@@ -284,7 +284,7 @@
 
     public void closeAllConnections() {
         for (FlowWorker worker : workers) {
-            worker.close();
+            worker.disconnect();
         }
     }
 
--- a/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java	Wed Mar 18 13:29:01 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java	Fri Mar 20 14:49:19 2020 +0100
@@ -20,7 +20,7 @@
         defaultTimeouts.put(FlowContext.STATE_CONNECTED, 60_000L);
         defaultTimeouts.put(FlowContext.STATE_REQ_SENT, 30_000L);
         defaultTimeouts.put(FlowContext.STATE_RESP_RECEIVED, 60_000L);
-        defaultTimeouts.put(FlowContext.STATE_ERROR, 60_000L);
+        defaultTimeouts.put(FlowContext.STATE_ERROR, Long.MAX_VALUE);
         defaultTimeouts.put(FlowContext.STATE_DISCONNECTING, 2_000L);
         defaultTimeouts.put(STATE_DISCONNECTED, 0L);
         DEFAULT_TIMEOUTS = Collections.unmodifiableMap(defaultTimeouts);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java	Wed Mar 18 13:29:01 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java	Fri Mar 20 14:49:19 2020 +0100
@@ -128,9 +128,9 @@
 
     public abstract int activeConnections();
 
-    public abstract void close();
+    public abstract void disconnect();
 
-    public abstract void close(SessionInfo session);
+    public abstract void disconnect(SessionInfo session);
 
     public abstract void handle(Event event);
 
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Wed Mar 18 13:29:01 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Fri Mar 20 14:49:19 2020 +0100
@@ -12,6 +12,7 @@
 import it.unimi.dsi.fastutil.ints.Int2LongArrayMap;
 import it.unimi.dsi.fastutil.ints.Int2LongMap;
 
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -38,6 +39,12 @@
 
     private long nextCheckTimeoutsTime = -1;
 
+    private int maxConnectionAttempts = 3;
+
+    private int maxEncoderErrors = 3;
+
+    private long reconnectDelay = 1000;
+
     private float sleepFactor = SLEEP_FACTOR_NO_SLEEP;
 
     private long lastEventTimestamp = -1;
@@ -92,7 +99,25 @@
         this.checkTimeoutsPeriod = checkTimeoutsPeriod;
     }
 
-    protected final void changeFlowState(FlowContext flowContext, int state) {
+    public long getReconnectDelay() {
+        return reconnectDelay;
+    }
+
+    public void setReconnectDelay(long reconnectDelay) {
+        this.reconnectDelay = reconnectDelay;
+    }
+
+    public int getMaxEncoderErrors() {
+        return maxEncoderErrors;
+    }
+
+    public void setMaxEncoderErrors(int maxEncoderErrors) {
+        this.maxEncoderErrors = maxEncoderErrors;
+    }
+
+
+
+    /*protected final void changeFlowState(FlowContext flowContext, int state) {
         try {
             if (flowContext.state() == state) {
                 return;
@@ -108,20 +133,12 @@
 
             switch (state) {
                 case FlowContext.STATE_CONNECTING:
-                    flowContext.clear();
-                    break;
                 case FlowContext.STATE_CONNECTED:
-                    flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY));
-                    break;
                 case FlowContext.STATE_ERROR:
-                    changeFlowState(flowContext, STATE_DISCONNECTED);
-                    break;
                 case FlowContext.STATE_RESP_RECEIVED:
-                    flowContext.sentEvent(null);
-                    flowContext.receivedStartTimestamp(-1);
-                    break;
                 case FlowContext.STATE_DISCONNECTING:
-                    if (flowContext.state() < FlowContext.STATE_DISCONNECTING) {
+                    throw new RuntimeException("Removed.");
+                    *//*if (flowContext.state() < FlowContext.STATE_DISCONNECTING) {
                         if (flowContext.channelContext() != null) {
                             try {
                                 flowContext.channelContext().close();
@@ -135,8 +152,7 @@
                         }
                     } else {
                         return;
-                    }
-                    break;
+                    }*//*
                 case STATE_DISCONNECTED:
                     flowContext.state(STATE_DISCONNECTED);
                     flowContext.clear();
@@ -145,14 +161,12 @@
                     return;
             }
 
-            long timeout = timeouts.get(flowContext.state());
-            flowContext.timeout(timeGenerator.currentTimeMillis() + timeout);
-            flowContext.state(state);
-            flowStateChanged(flowContext, oldState);
+            updateFlowState(flowContext, state, oldState);
+            throw new RuntimeException("Removed.");
         } catch (Exception e) {
             logger.debug(e.getMessage(), e);
         }
-    }
+    }*/
 
     protected void flowStateChanged(FlowContext context, int oldState) {
 
@@ -211,8 +225,7 @@
             try {
                 FlowContext flowContext = register(session);
                 if (flowContext != null) {
-                    emitter.connect(session, this, index);
-                    return flowContext;
+                    connect(flowContext);
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
@@ -221,13 +234,24 @@
         }
     }
 
+    protected void connect(FlowContext flowContext) {
+        synchronized (lock) {
+            try {
+                flowContext.connectionAttempts++;
+                emitter.connect(flowContext.session, this, index);
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+    }
+
     @Override
-    public void close() {
+    public void disconnect() {
         synchronized (lock) {
             for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) {
                 FlowContext flowContext = entry.getValue();
                 try {
-                    closeSession(flowContext);
+                    disconnect(flowContext);
                 } catch (Exception e) {
                     if (logger.isDebugEnabled()) {
                         debug(flowContext, e.getMessage(), e);
@@ -240,14 +264,18 @@
         }
     }
 
-    protected void close(SessionEvent sessionEvent) {
-        close(sessionEvent.getSessionInfo());
+    protected void disconnect(SessionEvent sessionEvent) {
+        disconnect(sessionEvent.getSessionInfo());
     }
 
-    protected void close(FlowContext flowContext) {
+    @Override
+    public void disconnect(SessionInfo session) {
         synchronized (lock) {
             try {
-                closeSession(flowContext);
+                FlowContext flowContext = flowContext(session);
+                if (flowContext != null) {
+                    disconnect(flowContext, true);
+                }
             } catch (Exception e) {
                 if (logger.isDebugEnabled()) {
                     logger.debug(e.getMessage(), e);
@@ -256,26 +284,61 @@
         }
     }
 
-    @Override
-    public void close(SessionInfo session) {
-        synchronized (lock) {
+    protected void disconnect(FlowContext flowContext) {
+        disconnect(flowContext, true);
+    }
+
+    protected void disconnect(FlowContext flowContext, boolean removeFlow) {
+        if (flowContext.channelContext() != null) {
+            updateFlowState(flowContext, STATE_DISCONNECTING);
             try {
-                FlowContext flowContext = flowContext(session);
-                closeSession(flowContext);
+                flowContext.channelContext().close();
             } catch (Exception e) {
                 if (logger.isDebugEnabled()) {
                     logger.debug(e.getMessage(), e);
                 }
             }
+
+            flowContext.clear();
+            if (removeFlow) {
+                removeFlowContext(flowContext);
+            }
+
+            updateFlowState(flowContext, STATE_DISCONNECTED);
         }
     }
 
-    protected void closeSession(FlowContext flowContext) {
-        synchronized (lock) {
-            if (flowContext != null) {
-                changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING);
-            }
+    private void error(FlowContext flowContext, Throwable cause) {
+        if (flowContext.state >= STATE_CONNECTED && flowContext.state < STATE_DISCONNECTING) {
+            disconnect(flowContext, false);
         }
+
+        flowContext.blocked = true;
+        updateFlowState(flowContext, STATE_ERROR);
+    }
+
+    private void responseReceived(FlowContext flowContext) {
+        flowContext.sentEvent(null);
+        flowContext.receivedStartTimestamp(-1);
+        updateFlowState(flowContext, STATE_RESP_RECEIVED);
+    }
+
+    private void updateFlowState(FlowContext flowContext, int newState) {
+        updateFlowState(flowContext, newState, flowContext.state);
+    }
+
+    private void updateFlowState(FlowContext flowContext, int newState, int oldState) {
+        if (logger.isDebugEnabled()) {
+            debug(flowContext, "Flow status changing {} -> {}.",
+                    contextStateToString(flowContext.state()),
+                    contextStateToString(newState)
+            );
+        }
+
+        long timeout = timeouts.get(newState);
+        flowContext.timeout(timeGenerator.currentTimeMillis() + timeout);
+        flowContext.state(newState);
+        flowStateChanged(flowContext, oldState);
     }
 
     protected void removeFlowContext(FlowContext flowContext) {
@@ -293,7 +356,7 @@
                 }
 
                 SessionInfo session = flowContext.sessionInfo();
-                changeFlowState(flowContext, FlowContext.STATE_CONNECTING);
+                updateFlowState(flowContext, FlowContext.STATE_CONNECTING);
                 emitter.connect(session, this, index);
             } catch (Exception e) {
                 error(flowContext, e.getMessage(), e);
@@ -304,7 +367,7 @@
     protected void closeAllConnections() {
         synchronized (lock) {
             for (FlowContext flowContext : sessions.values()) {
-                closeSession(flowContext);
+                disconnect(flowContext);
             }
         }
     }
@@ -355,7 +418,9 @@
                 context.setBidirectional(flowContext.isBidirectional());
                 flowContext.channelContext(context);
                 context.setAttachment(flowContext);
-                changeFlowState(flowContext, STATE_CONNECTED);
+                flowContext.connectionAttempts = 0;
+                flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY));
+                updateFlowState(flowContext, STATE_CONNECTED);
             }
 
             lock.notifyAll();
@@ -370,7 +435,24 @@
                 debug(flowContext, "Channel inactive.");
             }
 
-            changeFlowState(flowContext, STATE_DISCONNECTED);
+            updateFlowState(flowContext, STATE_DISCONNECTED);
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    public void sessionInvalidated(SessionInfo session) throws Exception {
+        synchronized (lock) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Session {} invalidated.", session);
+            }
+
+            FlowContext flowContext = flowContext(session);
+            if (flowContext != null) {
+                disconnect(flowContext);
+            }
+
+            addBlockedSession(session);
             lock.notifyAll();
         }
     }
@@ -401,7 +483,7 @@
                     }
 
                     decoder.clear(flowContext);
-                    changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED);
+                    responseReceived(flowContext);
                 } else if (decoder.state() == DataDecoder.STATE_FINISHED) {
                     if (collectMetric) {
                         synchronized (metric) {
@@ -425,7 +507,7 @@
                     }
 
                     decoder.clear(flowContext);
-                    changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED);
+                    responseReceived(flowContext);
                 }
             } catch (Exception e) {
                 if (collectMetric) {
@@ -469,7 +551,7 @@
 
                 flowContext.client().onDataWriteEnd(flowContext);
                 if (!flowContext.isBidirectional()) {
-                    changeFlowState(flowContext, STATE_RESP_RECEIVED);
+                    responseReceived(flowContext);
                 }
             }
 
@@ -478,14 +560,36 @@
     }
 
     @Override
-    public void errorOccurred(ChannelContext context, Throwable cause) throws Exception {
+    public final void errorOccurred(ChannelContext context, Throwable cause) throws Exception {
         if (logger.isDebugEnabled()) {
             logger.debug("Error occurred. " + cause.getMessage(), cause);
         }
 
         synchronized (lock) {
             FlowContext flowContext = (FlowContext) context.getAttachment();
-            changeFlowState(flowContext, FlowContext.STATE_ERROR);
+            //Jezeli nie nastapilo polaczenie flowContext == null
+            if (flowContext == null) {
+                flowContext = flowContext(context);
+            }
+
+            if (flowContext.state == STATE_CONNECTING) {
+                if (flowContext.connectionAttempts < maxConnectionAttempts) {
+                    //TODO - malo optymalne, blokuje przetwarzanie eventow dla konkretnej sesji.
+                    // Odbije sie nw wydajnosci workera asynch.
+                    if (reconnectDelay > 0) {
+                        try {
+                            Thread.sleep(reconnectDelay);
+                        } catch (InterruptedException ignore) {
+
+                        }
+                    }
+
+                    connect(flowContext);
+                    return;
+                }
+            }
+
+            error(flowContext, cause);
             lock.notifyAll();
         }
     }
@@ -498,10 +602,24 @@
                     return false;
                 }
 
+                ByteBuff buffer = null;
                 FlowHandler client = flowContext.client();
                 FlowHandlerDataEncoder encoder = client.getRequestEncoder(flowContext);
-                ByteBuff buffer = flowContext.buffer();
-                encoder.encode(req, flowContext, buffer);
+                buffer = flowContext.buffer();
+                try {
+                    encoder.encode(req, flowContext, buffer);
+                } catch (Exception e) {
+                    flowContext.encoderErrors++;
+                    if (logger.isDebugEnabled()) {
+                        debug(flowContext, e.getMessage(), e);
+                    }
+
+                    if (flowContext.encoderErrors == maxEncoderErrors) {
+                        error(flowContext, new IOException());
+                    }
+
+                    return false;
+                }
 
                 if (collectMetric) {
                     synchronized (metric) {
@@ -511,12 +629,13 @@
                 }
 
                 try {
-                    changeFlowState(flowContext, FlowContext.STATE_REQ_SENT);
+                    updateFlowState(flowContext, FlowContext.STATE_REQ_SENT);
                     flowContext.sentEvent(event);
                     flowContext.channelContext().writeAndFlush(buffer);
                     buffer.clear();
                     return true;
                 } catch (Exception e) {
+                    flowContext.sendErrors++;
                     if (logger.isDebugEnabled()) {
                         debug(flowContext, e.getMessage(), e);
                     }
@@ -548,7 +667,7 @@
                                 case FlowContext.STATE_CONNECTED:
                                 case FlowContext.STATE_REQ_SENT:
                                 case FlowContext.STATE_ERROR:
-                                    closeSession(flowContext);
+                                    disconnect(flowContext);
                                     break;
                                 case FlowContext.STATE_RESP_RECEIVED:
                                     //Dziwny blad nie powinien wystepowac
--- a/stress-tester/src/main/java/com/passus/st/client/NullFlowWorker.java	Wed Mar 18 13:29:01 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/NullFlowWorker.java	Fri Mar 20 14:49:19 2020 +0100
@@ -20,12 +20,12 @@
     }
 
     @Override
-    public void close() {
+    public void disconnect() {
 
     }
 
     @Override
-    public void close(SessionInfo session) {
+    public void disconnect(SessionInfo session) {
 
     }
 
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Wed Mar 18 13:29:01 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Fri Mar 20 14:49:19 2020 +0100
@@ -110,7 +110,7 @@
                 LocalFlowContext indexFlowContext = it.next();
                 if (indexFlowContext.eventsQueue.isEmpty()
                         && indexFlowContext.state() != FlowContext.STATE_REQ_SENT) {
-                    close(flowContext);
+                    disconnect(flowContext);
                     if (--diff == 0) {
                         break;
                     }
@@ -142,7 +142,7 @@
             if (localFlowContext.state() < FlowContext.STATE_DISCONNECTING
                     && localFlowContext.state() != FlowContext.STATE_REQ_SENT
                     && localFlowContext.eventsQueue.isEmpty()) {
-                close(flowContext);
+                disconnect(flowContext);
                 return;
             }
         }
@@ -157,7 +157,7 @@
                 SessionStatusEvent statusEvent = (SessionStatusEvent) event;
                 if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
                     localFlowContext.eventsQueue.poll();
-                    close((SessionStatusEvent) event);
+                    disconnect((SessionStatusEvent) event);
                 }
             } else if (event.getType() == SessionPayloadEvent.TYPE
                     && canSend(flowContext)) {
@@ -217,7 +217,7 @@
                         if (flowContext != null) {
                             if (flowContext.eventsQueue.isEmpty()
                                     && flowContext.state() != FlowContext.STATE_REQ_SENT) {
-                                close(statusEvent);
+                                disconnect(statusEvent);
                             } else {
                                 addToQueue(flowContext, event);
                             }
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Wed Mar 18 13:29:01 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Fri Mar 20 14:49:19 2020 +0100
@@ -48,23 +48,6 @@
     }
 
     @Override
-    public void sessionInvalidated(SessionInfo session) throws Exception {
-        synchronized (lock) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Session {} invalidated.", session);
-            }
-
-            FlowContext flowContext = flowContext(session);
-            if (flowContext != null) {
-                changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING);
-            }
-
-            addBlockedSession(session);
-            lock.notifyAll();
-        }
-    }
-
-    @Override
     protected void flowStateChanged(FlowContext context, int oldState) {
         if (logger.isDebugEnabled()) {
             logger.debug("flowStateChanged {},{}", context == currFlowContext, contextStateToString(context.state()));
@@ -131,10 +114,6 @@
      * @return boolean
      */
     private boolean pollNext() {
-        if (currFlowContext != null) {
-            return false;
-        }
-
         Event event = eventsQueue.poll();
         if (event != null) {
             sleep(event);
@@ -166,7 +145,7 @@
                         currFlowContext = flowContext((SessionEvent) event);
                         if (currFlowContext != null) {
                             if (currFlowContext.state() != FlowContext.STATE_REQ_SENT) {
-                                close(statusEvent);
+                                disconnect(statusEvent);
                             }
                         }
                     }
@@ -174,6 +153,10 @@
                     return true;
                 } else if (event.getType() == SessionPayloadEvent.TYPE) {
                     FlowContext flowContext = flowContext(sessEvent);
+                    if (flowContext.blocked) {
+                        return true;
+                    }
+
                     if (flowContext != null) {
                         switch (flowContext.state()) {
                             case FlowContext.STATE_CONNECTED:
@@ -256,10 +239,10 @@
     }
 
     @Override
-    public void close() {
+    public void disconnect() {
         synchronized (lock) {
             eventsQueue.clear();
-            super.close();
+            super.disconnect();
             lock.notifyAll();
         }
     }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java	Wed Mar 18 13:29:01 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java	Fri Mar 20 14:49:19 2020 +0100
@@ -4,6 +4,7 @@
 import com.passus.commons.time.TimeAware;
 import com.passus.commons.time.TimeGenerator;
 import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpResponse;
 import com.passus.st.client.FlowContext;
 import com.passus.st.client.FlowHandler;
 import com.passus.st.client.FlowHandlerDataDecoder;
@@ -16,9 +17,9 @@
 
 public class HttpFlowHandler implements FlowHandler, TimeAware {
 
-    private final HttpFlowHandlerDataDecoder decoder;
+    private final FlowHandlerDataDecoder<HttpResponse> decoder;
 
-    private final HttpFlowHandlerDataEncoder encoder;
+    private final FlowHandlerDataEncoder<HttpRequest> encoder;
 
     TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
 
@@ -34,6 +35,12 @@
         scopes = new HttpScopes();
     }
 
+    public HttpFlowHandler(FlowHandlerDataDecoder<HttpResponse> decoder,
+                           FlowHandlerDataEncoder<HttpRequest> encoder) {
+        this.decoder = decoder;
+        this.encoder = encoder;
+    }
+
     @Override
     public int getProtocolId() {
         return HTTP;
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Wed Mar 18 13:29:01 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Fri Mar 20 14:49:19 2020 +0100
@@ -55,7 +55,11 @@
     }
 
     public SessionInfo(String srcIp, int srcPort, String dstIp, int dstPort) {
-        this(IpAddress.parse(srcIp), srcPort, IpAddress.parse(dstIp), dstPort);
+        this(srcIp, srcPort, dstIp, dstPort, DEFAULT_TRANSPORT, UNKNOWN);
+    }
+
+    public SessionInfo(String srcIp, int srcPort, String dstIp, int dstPort, int transport, int protocolId) {
+        this(IpAddress.parse(srcIp), srcPort, IpAddress.parse(dstIp), dstPort, transport, protocolId);
     }
 
     public SessionInfo(IpAddress srcIp, int srcPort, IpAddress dstIp, int dstPort) {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java	Fri Mar 20 14:49:19 2020 +0100
@@ -0,0 +1,159 @@
+package com.passus.st.client;
+
+import com.passus.st.emitter.ChannelContext;
+import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.utils.TriFunction;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.ConnectException;
+
+import static com.passus.net.session.Session.PROTOCOL_TCP;
+import static com.passus.st.Protocols.HTTP;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.*;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNull;
+
+public class FlowWorkerBaseTest {
+
+    private final SessionInfo session = new SessionInfo("1.1.1.1", 10, "2.2.2.2", 20, PROTOCOL_TCP, HTTP);
+
+    private final Emitter mockEmitter = mock(Emitter.class);
+
+    private ChannelContext mockChannelContext() {
+        return mockChannelContext(session);
+    }
+
+    private TestFlowWorker worker() {
+        TestFlowWorker worker = new TestFlowWorker(mockEmitter);
+        worker.setReconnectDelay(0);
+        return worker;
+    }
+
+    private ChannelContext mockChannelContext(SessionInfo session) {
+        ChannelContext mockChannelContext = mock(ChannelContext.class);
+        when(mockChannelContext.getSessionInfo()).thenReturn(session);
+        return mockChannelContext;
+    }
+
+    private Void applyConnect(InvocationOnMock invocation, TriFunction<SessionInfo, EmitterHandler, Integer, Void> callback) throws Exception {
+        Object[] arguments = invocation.getArguments();
+        SessionInfo session = (SessionInfo) arguments[0];
+        EmitterHandler handler = (EmitterHandler) arguments[1];
+        Integer index = (Integer) arguments[2];
+
+        callback.apply(session, handler, index);
+        return null;
+    }
+
+    private ChannelContext makeConnected(TestFlowWorker worker, SessionInfo session) throws Exception {
+        ChannelContext channelContext = mockChannelContext();
+        doAnswer((Answer<Void>) invocation ->
+                applyConnect(invocation, (sessionInfo, handler, integer) -> {
+                    handler.channelActive(channelContext);
+                    return null;
+                })
+        ).when(mockEmitter).connect(any(SessionInfo.class), any(EmitterHandler.class), anyInt());
+        worker.connect(session);
+        FlowContext flowContext = worker.flowContext(session);
+        return channelContext;
+    }
+
+    @Test
+    public void testConnectionSuccess() throws Exception {
+        TestFlowWorker worker = worker();
+        makeConnected(worker, session);
+
+        FlowContext flowContext = worker.flowContext(session);
+        assertEquals(FlowContext.STATE_CONNECTED, flowContext.state);
+        assertEquals(false, flowContext.blocked);
+    }
+
+    @Test
+    public void testConnectionSuccess_FirstConnectionFailed() throws Exception {
+        ChannelContext channelContext = mockChannelContext();
+        MutableInt count = new MutableInt(0);
+        doAnswer((Answer<Void>) invocation ->
+                applyConnect(invocation, (sessionInfo, handler, integer) -> {
+
+                    if (count.intValue() == 1) {
+                        handler.channelActive(channelContext);
+                    } else {
+                        count.incrementAndGet();
+                        handler.errorOccurred(channelContext, new ConnectException("connection refused"));
+                    }
+                    return null;
+                })
+        ).when(mockEmitter).connect(any(SessionInfo.class), any(EmitterHandler.class), anyInt());
+
+        TestFlowWorker worker = worker();
+        worker.connect(session);
+        FlowContext flowContext = worker.flowContext(session);
+
+        assertEquals(1, count.intValue());
+        assertEquals(FlowContext.STATE_CONNECTED, flowContext.state);
+        assertEquals(0, flowContext.connectionAttempts);
+        assertEquals(false, flowContext.blocked);
+    }
+
+    @Test
+    public void testConnectionFailed_ConnectionAttemptsReached() throws Exception {
+        ChannelContext channelContext = mockChannelContext();
+        doAnswer((Answer<Void>) invocation ->
+                applyConnect(invocation, (sessionInfo, handler, integer) -> {
+                    handler.errorOccurred(channelContext, new ConnectException("connection refused"));
+                    return null;
+                })
+        ).when(mockEmitter).connect(any(SessionInfo.class), any(EmitterHandler.class), anyInt());
+
+        TestFlowWorker worker = worker();
+        worker.connect(session);
+        FlowContext flowContext = worker.flowContext(session);
+
+        assertEquals(FlowContext.STATE_ERROR, flowContext.state);
+        assertEquals(true, flowContext.blocked);
+    }
+
+    @Test
+    public void testDisconnect() throws Exception {
+        TestFlowWorker worker = worker();
+        makeConnected(worker, session);
+
+        FlowContext flowContext = worker.flowContext(session);
+        worker.disconnect(session);
+
+        assertEquals(FlowContext.STATE_DISCONNECTED, flowContext.state);
+        assertEquals(null, flowContext.buffer);
+        assertNull(worker.flowContext(session));
+    }
+
+    public class TestFlowWorker extends FlowWorkerBase {
+
+        public TestFlowWorker(Emitter emitter) {
+            this(emitter, "test", 0);
+        }
+
+        public TestFlowWorker(Emitter emitter, String name, int index) {
+            super(emitter, name, index);
+        }
+
+        @Override
+        protected FlowContext flowContext(SessionInfo session) {
+            return super.flowContext(session);
+        }
+
+        @Override
+        public void handle(Event event) {
+
+        }
+
+    }
+
+}
\ No newline at end of file
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Wed Mar 18 13:29:01 2020 +0100
+++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Fri Mar 20 14:49:19 2020 +0100
@@ -3,6 +3,7 @@
 import com.passus.config.Configuration;
 import com.passus.config.ConfigurationContext;
 import com.passus.data.ByteBuff;
+import com.passus.data.DataEncoder;
 import com.passus.data.HeapByteBuff;
 import com.passus.net.SocketAddress;
 import com.passus.net.http.HttpRequest;
@@ -20,9 +21,13 @@
 
 import static com.passus.st.utils.Assert.assertHttpClientEvents;
 import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
 
 public class SynchFlowWorkerTest {
 
+    public static final long JOIN_TIMEOUT = 1_000;
+
+
     private final TestHttpClientListener listener = new TestHttpClientListener();
 
     private static class LocalEmitter implements Emitter {
@@ -31,6 +36,16 @@
 
         private boolean started = false;
 
+        private final DataEncoder encoder;
+
+        public LocalEmitter() {
+            this(HttpResponseEncoder.INSTANCE);
+        }
+
+        public LocalEmitter(DataEncoder encoder) {
+            this.encoder = encoder;
+        }
+
         @Override
         public void setSessionMapper(SessionMapper sessionMapper) {
             this.sessionMapper = sessionMapper;
@@ -61,7 +76,7 @@
             HttpRequest request = (HttpRequest) event.getRequest();
             HttpResponse response = (HttpResponse) event.getResponse();
             ByteBuff buff = new HeapByteBuff();
-            HttpResponseEncoder.INSTANCE.encode(response, buff);
+            encoder.encode(response, buff);
             try {
                 clientWorker.dataReceived(channelContext, buff);
             } catch (Exception ex) {
@@ -230,32 +245,66 @@
         listener.clear();
     }
 
+    private void join(SynchFlowWorker worker) {
+        try {
+            worker.join(JOIN_TIMEOUT);
+        } catch (InterruptedException ignore) {
+
+        }
+
+        assertFalse(worker.isWorking());
+    }
+
+    private List<Event> readDefaultEvents() throws Exception {
+        List<Event> events = readEvents("pcap/http/http_req_resp.pcap");
+        assertEquals(4, events.size());
+        return events;
+    }
+
     @Test
     public void testHandle_HTTP_SimpleRequestResponse() throws Exception {
-        List<Event> events = readEvents("pcap/http/http_req_resp.pcap");
-        assertEquals(4, events.size());
-
+        List<Event> events = readDefaultEvents();
         SynchFlowWorker worker = createWorker();
         worker.start();
         SessionEvent sessionEvent = (SessionEvent) events.get(0);
         worker.handle(new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED));
         events.forEach(worker::handle);
-        worker.join();
+        join(worker);
 
         assertHttpClientEvents(events, listener.events());
     }
 
     @Test
     public void testHandle_HTTP_SimpleRequestResponse_ConnectPartialSession() throws Exception {
-        List<Event> events = readEvents("pcap/http/http_req_resp.pcap");
-        assertEquals(4, events.size());
-
+        List<Event> events = readDefaultEvents();
         SynchFlowWorker worker = createWorker();
         worker.setConnectPartialSession(true);
         worker.start();
         events.forEach(worker::handle);
-        worker.join();
-
+        join(worker);
         assertHttpClientEvents(events, listener.events());
     }
+
+    @Test
+    public void testHandle_EmitterException() throws Exception {
+        List<Event> events = readDefaultEvents();
+        LocalEmitter emitter = new LocalEmitter((object, out) -> {
+            throw new RuntimeException("Test exception");
+        });
+        SynchFlowWorker worker = new SynchFlowWorker(emitter, "test", 0);
+        worker.setListener(listener);
+        worker.setConnectPartialSession(true);
+        worker.start();
+        events.forEach(worker::handle);
+        join(worker);
+    }
+
+    @Test(enabled = false)
+    public void testHandle_EncoderException() throws Exception {
+        List<Event> events = readDefaultEvents();
+        SynchFlowWorker worker = createWorker();
+        /*worker.setClientFactory(protocolId -> {
+
+        });*/
+    }
 }
\ No newline at end of file
--- a/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java	Wed Mar 18 13:29:01 2020 +0100
+++ b/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java	Fri Mar 20 14:49:19 2020 +0100
@@ -53,7 +53,7 @@
         try {
             flowExecutor.start();
             events.forEach(flowExecutor::handle);
-            flowExecutor.join(5_000);
+            flowExecutor.join();
 
             assertEquals(0, listener.getErrors());
             assertEquals(2, listener.getReceived().size());
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/utils/TriFunction.java	Fri Mar 20 14:49:19 2020 +0100
@@ -0,0 +1,8 @@
+package com.passus.st.utils;
+
+@FunctionalInterface
+public interface TriFunction<A, B, C, R> {
+
+    R apply(A a, B b, C c) throws Exception;
+
+}
\ No newline at end of file