changeset 1031:37d098b33b23

FlowWorkerBase refactorization in progress
author Devel 2
date Thu, 02 Apr 2020 14:02:38 +0200
parents 170c8ce25bef
children d136672f267c
files stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/FlowContext.java stress-tester/src/main/java/com/passus/st/client/FlowError.java stress-tester/src/main/java/com/passus/st/client/FlowUtils.java stress-tester/src/main/java/com/passus/st/client/FlowWorker.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/Timeouts.java stress-tester/src/test/java/com/passus/st/client/FlowExecutorTest.java stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java
diffstat 13 files changed, 490 insertions(+), 372 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java	Thu Apr 02 10:40:41 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java	Thu Apr 02 14:02:38 2020 +0200
@@ -249,9 +249,10 @@
                         case FlowContext.STATE_CONNECTING:
                             return false;
                         case FlowContext.STATE_CONNECTED:
-                            if(flowContext.isEventSent()) {
+                            if (flowContext.isEventSent()) {
                                 return false;
-                            } else if (send(flowContext, (SessionPayloadEvent) event)) {
+                            } else {
+                                send(flowContext, (SessionPayloadEvent) event, true);
                                 return true;
                             }
                         case FlowContext.STATE_DISCONNECTING:
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Thu Apr 02 10:40:41 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Thu Apr 02 14:02:38 2020 +0200
@@ -8,14 +8,17 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class FlowContext {
 
-    public static final byte STATE_ERROR = -1;
-    public static final byte STATE_CONNECTING = 0;
-    public static final byte STATE_CONNECTED = 1;
-    public static final byte STATE_DISCONNECTING = 4;
-    public static final byte STATE_DISCONNECTED = 5;
+    public static final byte STATE_NEW = 0;
+    public static final byte STATE_CONNECTING = 1 << 1;
+    public static final byte STATE_CONNECTED = 1 << 2;
+    public static final byte STATE_DISCONNECTING = 1 << 3;
+    public static final byte STATE_DISCONNECTED = 1 << 4;
+    public static final byte STATE_ERROR = 1 << 5;
 
     public static final int INIT_BUFFER_CAPACITY = 1024;
 
@@ -23,7 +26,9 @@
 
     protected final SessionInfo session;
 
-    protected boolean blocked;
+    protected volatile boolean blocked;
+
+    protected volatile boolean sessionEstablishedSeen;
 
     protected ByteBuff buffer;
 
@@ -33,17 +38,19 @@
 
     protected byte state = STATE_CONNECTING;
 
+    protected long connectionTime = -1;
+
     protected long timeout = -1;
 
     protected long receivedStartTime = -1;
 
     protected long sendStartTime = -1;
 
-    private int loop;
+    protected int loop;
 
-    private FlowHandler client;
+    protected FlowHandler client;
 
-    private boolean bidirectional = true;
+    protected boolean bidirectional = true;
 
     protected int connectionAttempts;
 
@@ -55,6 +62,10 @@
 
     protected FlowError error;
 
+    protected ReentrantLock lock;
+
+    protected Condition lockCond;
+
     @Deprecated
     protected DataDecoder decoder;
 
@@ -64,6 +75,23 @@
         this.session = session;
     }
 
+    public void createLock() {
+        lock = new ReentrantLock();
+        lockCond = lock.newCondition();
+    }
+
+    void lock() {
+        lock.lock();
+    }
+
+    void unlock() {
+        lock.unlock();
+    }
+
+    void signal() {
+        lockCond.signal();
+    }
+
     public boolean isBidirectional() {
         return bidirectional;
     }
@@ -96,6 +124,14 @@
         return state;
     }
 
+    public boolean isError() {
+        return state == STATE_ERROR;
+    }
+
+    public boolean isConnected() {
+        return state == STATE_CONNECTED;
+    }
+
     public void timeout(long timeout) {
         this.timeout = timeout;
     }
@@ -118,10 +154,6 @@
         return error;
     }
 
-    public boolean isError() {
-        return error != null;
-    }
-
     public long receivedStartTimestamp() {
         return receivedStartTime;
     }
@@ -231,6 +263,7 @@
     }
 
     public void clear() {
+        sessionEstablishedSeen = false;
         buffer = null;
         sentEvent = null;
         timeout = -1;
@@ -244,10 +277,13 @@
 
     @Override
     public String toString() {
-        return "FlowContext{state=" + contextStateToString(state) + '}';
+        return "FlowContext{" +
+                "session=" + session +
+                ", state=" + stateToString(state) +
+                '}';
     }
 
-    public static String contextStateToString(int state) {
+    public static String stateToString(int state) {
         switch (state) {
             case STATE_ERROR:
                 return "error";
@@ -255,10 +291,6 @@
                 return "connecting";
             case STATE_CONNECTED:
                 return "connected";
-/*            case STATE_REQ_SENT:
-                return "req_sent";
-            case STATE_RESP_RECEIVED:
-                return "resp_received";*/
             case STATE_DISCONNECTING:
                 return "disconnecting";
             case STATE_DISCONNECTED:
--- a/stress-tester/src/main/java/com/passus/st/client/FlowError.java	Thu Apr 02 10:40:41 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowError.java	Thu Apr 02 14:02:38 2020 +0200
@@ -33,6 +33,8 @@
     public static final byte CODE_SSL_UNRECOGNIZED_MESSAGE_ERROR = 18;
     public static final byte CODE_SSL_HANDSHAKE_ERROR = 19;
 
+    public static final byte CODE_INTERNAL_ERROR = (byte) 255;
+
     private final byte code;
 
     private final String message;
@@ -78,6 +80,14 @@
         return new FlowError(CODE_UNKNOWN_ERROR, "Unknown error.", cause);
     }
 
+    public static FlowError internalError() {
+        return internalError(null);
+    }
+
+    public static FlowError internalError(Throwable cause) {
+        return new FlowError(CODE_INTERNAL_ERROR, "Internal error.", cause);
+    }
+
     public static FlowError interpret(Throwable ex, String stackTrace) {
         if (stackTrace.contains("Connection refused")) {
             return new FlowError(CODE_CONNECTION_REFUSED, "Connection refused.", ex);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java	Thu Apr 02 10:40:41 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java	Thu Apr 02 14:02:38 2020 +0200
@@ -12,18 +12,6 @@
 
 public class FlowUtils {
 
-    public static final Map<Byte, Long> DEFAULT_TIMEOUTS;
-
-    static {
-        Map<Byte, Long> defaultTimeouts = new HashMap<>();
-        defaultTimeouts.put(FlowContext.STATE_CONNECTING, 10_000L);
-        defaultTimeouts.put(FlowContext.STATE_CONNECTED, 60_000L);
-        defaultTimeouts.put(FlowContext.STATE_ERROR, Long.MAX_VALUE);
-        defaultTimeouts.put(FlowContext.STATE_DISCONNECTING, 2_000L);
-        defaultTimeouts.put(STATE_DISCONNECTED, 0L);
-        DEFAULT_TIMEOUTS = Collections.unmodifiableMap(defaultTimeouts);
-    }
-
     private FlowUtils() {
     }
 
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java	Thu Apr 02 10:40:41 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java	Thu Apr 02 14:02:38 2020 +0200
@@ -35,11 +35,14 @@
 
     protected TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
 
+    protected final boolean trace;
+
     public FlowWorker(Emitter emitter, String name, int index) {
         super(name + index);
         Assert.notNull(emitter, "emitter");
         this.emitter = emitter;
         this.index = index;
+        this.trace = logger.isTraceEnabled();
     }
 
     public boolean isConnectPartialSession() {
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Thu Apr 02 10:40:41 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Thu Apr 02 14:02:38 2020 +0200
@@ -9,18 +9,15 @@
 import com.passus.st.emitter.Emitter;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.metric.MetricsContainer;
-import it.unimi.dsi.fastutil.bytes.Byte2LongArrayMap;
-import it.unimi.dsi.fastutil.bytes.Byte2LongMap;
 
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 import static com.passus.st.client.FlowContext.*;
-import static com.passus.st.client.FlowError.CODE_CONNECTION_ATTEMPTS_REACHED;
-import static com.passus.st.client.FlowError.CODE_MAX_ENCODER_ERRORS_REACHED;
-import static com.passus.st.client.FlowUtils.DEFAULT_TIMEOUTS;
+import static com.passus.st.client.FlowError.*;
 
 public abstract class FlowWorkerBase extends FlowWorker {
 
@@ -30,7 +27,7 @@
 
     private final Set<SessionInfo> blockedSessions = new HashSet<>();
 
-    private final Byte2LongMap timeouts = new Byte2LongArrayMap();
+    private final Timeouts timeouts = new Timeouts();
 
     protected final Object lock = new Object();
 
@@ -52,7 +49,6 @@
 
     public FlowWorkerBase(Emitter emitter, String name, int index) {
         super(emitter, name, index);
-        timeouts.putAll(DEFAULT_TIMEOUTS);
     }
 
     @Override
@@ -140,7 +136,9 @@
     }
 
     protected FlowContext createFlowContext(SessionInfo session) {
-        return new FlowContext(session);
+        FlowContext flowContext = new FlowContext(session);
+        flowContext.createLock();
+        return flowContext;
     }
 
     protected FlowContext register(SessionEvent sessionEvent) {
@@ -148,20 +146,18 @@
     }
 
     protected FlowContext register(SessionInfo session) {
-        synchronized (lock) {
-            if (sessions.containsKey(session)) {
-                logger.warn("Unable to register session '" + session + "'. Session already registered.");
-                return null;
-            }
+        if (sessions.containsKey(session)) {
+            logger.warn("Unable to register session '" + session + "'. Session already registered.");
+            return null;
+        }
 
-            FlowContext flowContext = createFlowContext(session);
-            //TODO Malo optymalne
-            FlowHandler client = clientFactory.create(session.getProtocolId());
-            client.init(flowContext);
-            flowContext.client(client);
-            sessions.put(session, flowContext);
-            return flowContext;
-        }
+        FlowContext flowContext = createFlowContext(session);
+        //TODO Malo optymalne
+        FlowHandler client = clientFactory.create(session.getProtocolId());
+        client.init(flowContext);
+        flowContext.client(client);
+        sessions.put(session, flowContext);
+        return flowContext;
     }
 
     protected FlowContext connect(SessionEvent sessionEvent) {
@@ -173,43 +169,61 @@
             try {
                 FlowContext flowContext = register(session);
                 if (flowContext != null) {
-                    connect(flowContext);
+                    connect(flowContext, true);
                 }
             } catch (Exception e) {
-                logger.error(e.getMessage(), e);
+                logger.error(e);
             }
             return null;
         }
     }
 
+    protected FlowContext registerAndConnect(SessionInfo session, boolean wait) {
+        FlowContext flowContext = flowContext(session);
+        if (flowContext != null) {
+            throw new RuntimeException("Not implemented yet.");
+        } else {
+            flowContext = register(session);
+        }
+
+        connect(flowContext, wait);
+        return flowContext;
+    }
+
     protected void connect(FlowContext flowContext) {
-        synchronized (lock) {
-            try {
-                flowContext.connectionAttempts++;
-                emitter.connect(flowContext.session, this, index);
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
+        connect(flowContext, true);
+    }
+
+    protected void connect(FlowContext flowContext, boolean wait) {
+        flowContext.lock.lock();
+        try {
+            flowContext.connectionAttempts++;
+            emitter.connect(flowContext.session, this, index);
+            if (wait) {
+                waitOpFinished(flowContext, STATE_CONNECTED);
             }
+        } catch (Exception ex) {
+            error(flowContext, ex);
+        } finally {
+            flowContext.lock.unlock();
         }
     }
 
     @Override
     public void disconnect() {
-        synchronized (lock) {
-            for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) {
-                FlowContext flowContext = entry.getValue();
-                try {
-                    disconnect(flowContext);
-                } catch (Exception e) {
-                    if (logger.isDebugEnabled()) {
-                        debug(flowContext, e.getMessage(), e);
-                    }
+        for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) {
+            FlowContext flowContext = entry.getValue();
+            try {
+                disconnect(flowContext);
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    debug(flowContext, e.getMessage(), e);
                 }
             }
+        }
 
-            sessions.clear();
-            working = false;
-        }
+        sessions.clear();
+        working = false;
     }
 
     protected void disconnect(SessionEvent sessionEvent) {
@@ -218,16 +232,14 @@
 
     @Override
     public void disconnect(SessionInfo session) {
-        synchronized (lock) {
-            try {
-                FlowContext flowContext = flowContext(session);
-                if (flowContext != null) {
-                    disconnect(flowContext, true);
-                }
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug(e.getMessage(), e);
-                }
+        try {
+            FlowContext flowContext = flowContext(session);
+            if (flowContext != null) {
+                disconnect(flowContext, true);
+            }
+        } catch (Exception e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug(e.getMessage(), e);
             }
         }
     }
@@ -237,22 +249,70 @@
     }
 
     protected void disconnect(FlowContext flowContext, boolean removeFlow) {
-        if (flowContext.channelContext() != null) {
-            updateFlowState(flowContext, STATE_DISCONNECTING);
+        disconnect(flowContext, removeFlow, true);
+    }
+
+    protected void disconnect(FlowContext flowContext, boolean removeFlow, boolean wait) {
+        if (logger.isDebugEnabled()) {
+            debug(flowContext, "Disconnect.");
+        }
+
+        SessionInfo sessionInfo = null;
+        long now = timeGenerator.currentTimeMillis();
+        flowContext.lock.lock();
+        try {
+            if (flowContext.state == STATE_DISCONNECTING
+                    || flowContext.state == STATE_DISCONNECTED) {
+                return;
+            }
+
+            /*if (removeFlow) {
+                sessionInfo = flowContext.session;
+            }*/
+
+            flowContext.state = STATE_DISCONNECTING;
+            flowContext.timeout = now + timeouts.getDisconnectingTimeout();
+
             try {
-                flowContext.channelContext().close();
+                onDisconnecting(flowContext);
             } catch (Exception e) {
                 if (logger.isDebugEnabled()) {
-                    logger.debug(e.getMessage(), e);
+                    debug(flowContext, "Error occurred during onDisconnecting calling.", e);
                 }
             }
 
-            flowContext.clear();
-            if (removeFlow) {
-                removeFlowContext(flowContext);
+            if (flowContext.channelContext() != null) {
+                try {
+                    flowContext.channelContext().close();
+                } catch (Exception e) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug(e.getMessage(), e);
+                    }
+                }
             }
 
-            updateFlowState(flowContext, STATE_DISCONNECTED);
+            if (wait) {
+                waitOpFinished(flowContext, STATE_DISCONNECTED, timeouts.getDisconnectingTimeout());
+            }
+        } catch (InterruptedException e) {
+            error(flowContext, e);
+        } finally {
+            flowContext.lock.unlock();
+        }
+
+        if (removeFlow && sessionInfo != null) {
+            synchronized (sessions) {
+                sessions.remove(sessionInfo);
+            }
+        }
+    }
+
+    protected void errorInternal(FlowContext flowContext, Throwable cause) {
+        if (flowContext == null) {
+            logger.error("Internal error.", cause);
+        } else {
+            logger.error("Flow {} internal error.", flowContext, cause);
+            error(flowContext, FlowError.internalError(cause));
         }
     }
 
@@ -266,10 +326,10 @@
         }
 
         flowContext.error(error);
-        flowError(flowContext);
+        onError(flowContext);
     }
 
-    protected void flowError(FlowContext flowContext) {
+    protected void onError(FlowContext flowContext) {
 
     }
 
@@ -278,36 +338,30 @@
         onRequestSent(flowContext, event);
     }
 
-    protected void onRequestSent(FlowContext flowContext, SessionPayloadEvent event) {
-
-    }
-
     private void responseReceived0(FlowContext flowContext, Object response) {
         onResponseReceived(flowContext, response);
         flowContext.sentEvent(null);
         flowContext.receivedStartTimestamp(-1);
     }
 
-    protected void onResponseReceived(FlowContext flowContext, Object response) {
+    protected void onConnected(FlowContext flowContext) {
 
     }
 
-    private void updateFlowState(FlowContext flowContext, byte newState) {
-        updateFlowState(flowContext, newState, flowContext.state);
+    protected void onDisconnecting(FlowContext flowContext) {
+
     }
 
-    private void updateFlowState(FlowContext flowContext, byte newState, byte oldState) {
-        if (logger.isDebugEnabled()) {
-            debug(flowContext, "Flow status changing {} -> {}.",
-                    contextStateToString(flowContext.state()),
-                    contextStateToString(newState)
-            );
-        }
+    protected void onDisconnected(FlowContext flowContext) {
 
-        long timeout = timeouts.get(newState);
-        flowContext.timeout(timeGenerator.currentTimeMillis() + timeout);
-        flowContext.state(newState);
-        flowStateChanged(flowContext, oldState);
+    }
+
+    protected void onRequestSent(FlowContext flowContext, SessionPayloadEvent event) {
+
+    }
+
+    protected void onResponseReceived(FlowContext flowContext, Object response) {
+
     }
 
     protected void removeFlowContext(FlowContext flowContext) {
@@ -317,11 +371,11 @@
         }
     }
 
-    protected void reconnect(FlowContext flowContext) {
+    /*protected void reconnect(FlowContext flowContext) {
         synchronized (lock) {
             try {
                 if (logger.isDebugEnabled()) {
-                    debug(flowContext, "Reconnect (state: {}).", contextStateToString(flowContext.state()));
+                    debug(flowContext, "Reconnect (state: {}).", stateToString(flowContext.state()));
                 }
 
                 SessionInfo session = flowContext.sessionInfo();
@@ -331,13 +385,15 @@
                 error(flowContext, e.getMessage(), e);
             }
         }
-    }
+    }*/
 
     protected void closeAllConnections() {
-        synchronized (lock) {
-            for (FlowContext flowContext : sessions.values()) {
-                disconnect(flowContext);
-            }
+        closeAllConnections(true);
+    }
+
+    protected void closeAllConnections(boolean wait) {
+        for (FlowContext flowContext : sessions.values()) {
+            disconnect(flowContext, wait);
         }
     }
 
@@ -368,45 +424,70 @@
 
     @Override
     public void writeMetrics(MetricsContainer container) {
-        synchronized (lock) {
-            super.writeMetrics(container);
-        }
+        super.writeMetrics(container);
     }
 
     @Override
     public void channelActive(ChannelContext context) throws Exception {
-        synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            if (flowContext != null) {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})",
-                            context.getLocalAddress(),
-                            context.getRemoteAddress());
-                }
+        FlowContext flowContext = flowContext(context);
+        if (flowContext != null) {
+            if (logger.isDebugEnabled()) {
+                debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})",
+                        context.getLocalAddress(),
+                        context.getRemoteAddress());
+            }
 
+            flowContext.lock();
+            try {
                 context.setBidirectional(flowContext.isBidirectional());
                 flowContext.channelContext(context);
                 context.setAttachment(flowContext);
                 flowContext.connectionAttempts = 0;
                 flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY));
-                updateFlowState(flowContext, STATE_CONNECTED);
+                flowContext.state = STATE_CONNECTED;
+
+                try {
+                    onConnected(flowContext);
+                } catch (Exception e) {
+                    if (logger.isDebugEnabled()) {
+                        debug(flowContext, "Error occurred during onConnected calling.", e);
+                    }
+                }
+            } catch (Exception ex) {
+                error(flowContext, ex);
+            } finally {
+                flowContext.signal();
+                flowContext.unlock();
             }
-
-            lock.notifyAll();
         }
     }
 
     @Override
     public void channelInactive(ChannelContext context) throws Exception {
-        synchronized (lock) {
-            FlowContext flowContext = (FlowContext) context.getAttachment();
-            if (logger.isDebugEnabled()) {
-                debug(flowContext, "Channel inactive.");
+        FlowContext flowContext = (FlowContext) context.getAttachment();
+        if (logger.isDebugEnabled()) {
+            debug(flowContext, "Channel inactive.");
+        }
+
+        flowContext.lock();
+        try {
+            flowContext.state = STATE_DISCONNECTED;
+            try {
+                onDisconnected(flowContext);
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    debug(flowContext, "Error occurred during onDisconnected calling.", e);
+                }
             }
 
-            updateFlowState(flowContext, STATE_DISCONNECTED);
-            lock.notifyAll();
+            sessions.remove(flowContext.session);
+        } finally {
+            flowContext.clear();
+            flowContext.signal();
+            flowContext.unlock();
         }
+
+
     }
 
     @Override
@@ -428,8 +509,9 @@
 
     @Override
     public void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
-        synchronized (lock) {
-            FlowContext flowContext = (FlowContext) context.getAttachment();
+        FlowContext flowContext = (FlowContext) context.getAttachment();
+        flowContext.lock();
+        try {
             try {
                 FlowHandler client = flowContext.client();
                 FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext);
@@ -491,8 +573,9 @@
 
                 error(flowContext, FlowError.unknownError());
             }
-
-            lock.notifyAll();
+        } finally {
+            flowContext.signal();
+            flowContext.unlock();
         }
     }
 
@@ -512,9 +595,9 @@
     public void dataWritten(ChannelContext context) throws Exception {
         synchronized (lock) {
             FlowContext flowContext = (FlowContext) context.getAttachment();
-            if (flowContext.sentEvent() != null) {
-                long now = timeGenerator.currentTimeMillis();
+            if (flowContext.isEventSent()) {
                 if (collectMetric) {
+                    long now = timeGenerator.currentTimeMillis();
                     synchronized (metric) {
                         metric.addRequestSendingTime(now - flowContext.sendStartTimestamp());
                     }
@@ -533,16 +616,17 @@
     @Override
     public final void errorOccurred(ChannelContext context, Throwable cause) throws Exception {
         if (logger.isDebugEnabled()) {
-            logger.debug("Error occurred. " + cause.getMessage(), cause);
+            logger.debug("Error occurred. ", cause);
         }
 
-        synchronized (lock) {
-            FlowContext flowContext = (FlowContext) context.getAttachment();
-            //Jezeli nie nastapilo polaczenie flowContext == null
-            if (flowContext == null) {
-                flowContext = flowContext(context);
-            }
+        FlowContext flowContext = (FlowContext) context.getAttachment();
+        //Jezeli nie nastapilo polaczenie flowContext == null
+        if (flowContext == null) {
+            flowContext = flowContext(context);
+        }
 
+        flowContext.lock.lock();
+        try {
             if (flowContext.state == STATE_CONNECTING) {
                 if (flowContext.connectionAttempts < maxConnectionAttempts) {
                     //TODO - malo optymalne, blokuje przetwarzanie eventow dla konkretnej sesji.
@@ -559,18 +643,20 @@
                     return;
                 }
             }
+        } finally {
+            flowContext.lock.unlock();
+        }
 
-            error(flowContext, new FlowError(CODE_CONNECTION_ATTEMPTS_REACHED, "Max connection attempts reached."));
-            lock.notifyAll();
-        }
+        error(flowContext, new FlowError(CODE_CONNECTION_ATTEMPTS_REACHED, "Max connection attempts reached."));
     }
 
-    protected boolean send(FlowContext flowContext, SessionPayloadEvent event) {
-        synchronized (lock) {
+    protected void send(FlowContext flowContext, SessionPayloadEvent event, boolean wait) {
+        flowContext.lock();
+        try {
             Object req = event.getRequest();
             if (req != null) {
                 if (filterChain.filterOutbound(req, event.getResponse(), flowContext) == Filter.DENY) {
-                    return false;
+                    return;
                 }
 
                 ByteBuff buffer;
@@ -589,7 +675,7 @@
                         error(flowContext, new FlowError(CODE_MAX_ENCODER_ERRORS_REACHED, "Max encoder errors reached."));
                     }
 
-                    return false;
+                    return;
                 }
 
                 if (collectMetric) {
@@ -603,17 +689,90 @@
                     flowContext.channelContext().writeAndFlush(buffer);
                     requestSent0(flowContext, event);
                     buffer.clear();
-                    return true;
                 } catch (Exception e) {
                     flowContext.sendErrors++;
                     if (logger.isDebugEnabled()) {
                         debug(flowContext, e.getMessage(), e);
                     }
                 }
+
+                if (wait && flowContext.isBidirectional()) {
+                    waitForResponse(flowContext);
+                }
             }
+        } catch (Exception e) {
+            error(flowContext, e);
+        } finally {
+            flowContext.unlock();
+        }
+    }
+
+    protected boolean waitForResponse(FlowContext flowContext) throws InterruptedException {
+        return waitForResponse(flowContext, timeouts.getDefaultTimeout());
+    }
+
+    protected boolean waitForResponse(FlowContext flowContext, long timeout) throws InterruptedException {
+        long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeout);
+        while (flowContext.sentEvent != null && flowContext.state == STATE_CONNECTED) {
+            if (timeNanos <= 0) {
+                return false;
+            }
+
+            timeNanos = flowContext.lockCond.awaitNanos(timeNanos);
         }
 
-        return false;
+        return true;
+    }
+
+    protected boolean waitOpFinished(FlowContext flowContext, byte neededFlags) throws InterruptedException {
+        return waitOpFinished(flowContext, neededFlags, timeouts.getDefaultTimeout());
+    }
+
+    protected boolean waitOpFinished(FlowContext flowContext, byte stateNeeded, long timeout) throws InterruptedException {
+        long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeout);
+        while ((flowContext.state & stateNeeded) != stateNeeded && !flowContext.isError()) {
+            if (timeNanos <= 0) {
+                return false;
+            }
+
+            timeNanos = flowContext.lockCond.awaitNanos(timeNanos);
+        }
+
+        return true;
+    }
+
+    protected void processFlowSessionStatusEvent(SessionStatusEvent statusEvent, boolean wait) {
+        FlowContext flowContext = null;
+        try {
+            if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
+                flowContext = registerAndConnect(statusEvent.getSessionInfo(), wait);
+                flowContext.sessionEstablishedSeen = true;
+            } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
+                flowContext = flowContext(statusEvent);
+                if (flowContext != null) {
+                    disconnect(flowContext, wait);
+                }
+            }
+        } catch (Exception e) {
+            errorInternal(flowContext, e);
+        }
+    }
+
+    protected boolean mayReconnect(FlowContext flowContext) {
+        if (flowContext.state == STATE_CONNECTED || !flowContext.sessionEstablishedSeen) {
+            return false;
+        }
+
+        return flowContext.error != null
+                && (flowContext.error.code() == CODE_CONNECTION_RESET_BY_PEER
+                || flowContext.error.code() == CODE_CONNECTION_CLOSED_UNEXPECTEDLY
+                || flowContext.error.code() == CODE_IDE_TIMEOUT);
+    }
+
+    protected boolean checkMayConnectIfPartial(FlowContext flowContext) {
+        return connectPartialSession
+                && !flowContext.sessionEstablishedSeen
+                && !flowContext.isError();
     }
 
     protected void processTimeouts() {
@@ -629,7 +788,7 @@
                             if (logger.isDebugEnabled()) {
                                 debug(flowContext, "Flow for session '{}' timed out (state '{}').",
                                         flowContext.sessionInfo(),
-                                        contextStateToString(flowContext.state()));
+                                        stateToString(flowContext.state()));
                             }
 
                             switch (flowContext.state()) {
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Thu Apr 02 10:40:41 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Thu Apr 02 14:02:38 2020 +0200
@@ -95,11 +95,11 @@
         closeAllConnections = false;
     }
 
-    @Override
-    protected boolean send(FlowContext flowContext, SessionPayloadEvent event) {
+/*    @Override
+    protected boolean send(FlowContext flowContext, SessionPayloadEvent event, boolean wait) {
         //Sprawdzamy, czy polaczen nie jest za duzo. Jezeli jest, to zamykamy
         //najmniej uzywane.
-        if (flowIndex.size() > maxSentRequests) {
+       if (flowIndex.size() > maxSentRequests) {
             int diff = flowIndex.size() - maxSentRequests;
             if (logger.isDebugEnabled()) {
                 debug(flowContext, "Too many connections {}.", flowIndex.size());
@@ -119,7 +119,9 @@
         }
 
         return super.send(flowContext, event);
-    }
+
+
+    }*/
 
     private boolean canSend(FlowContext flowContext) {
         return flowContext.state() == FlowContext.STATE_CONNECTED && !flowContext.isEventSent();
@@ -158,7 +160,7 @@
             } else if (event.getType() == SessionPayloadEvent.TYPE
                     && canSend(flowContext)) {
                 localFlowContext.eventsQueue.poll();
-                send(flowContext, (SessionPayloadEvent) event);
+                send(flowContext, (SessionPayloadEvent) event, true);
             } else {
                 localFlowContext.eventsQueue.poll();
             }
@@ -230,7 +232,7 @@
                                     && (flowContext.state() == FlowContext.STATE_CONNECTED
                                     || flowContext.state() == FlowContext.STATE_ERROR
                                     || flowContext.isEventSent())) {
-                                send(flowContext, payloadEvent);
+                                send(flowContext, payloadEvent, true);
                             } else {
                                 addToQueue(flowContext, event);
                             }
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Thu Apr 02 10:40:41 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Thu Apr 02 14:02:38 2020 +0200
@@ -4,28 +4,24 @@
 import com.passus.commons.annotations.Plugin;
 import com.passus.st.emitter.Emitter;
 import com.passus.st.plugin.PluginConstants;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
-import java.io.IOException;
 import java.util.concurrent.LinkedBlockingDeque;
 
-import static com.passus.st.client.FlowContext.contextStateToString;
+import static com.passus.st.client.FlowContext.STATE_CONNECTED;
 
 @Plugin(name = SynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER)
 public class SynchFlowWorker extends FlowWorkerBase {
 
+    protected final Logger logger = LogManager.getLogger(getClass());
+
     public static final String TYPE = "synch";
 
     private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>();
 
     private long eventsQueueWaitTime = 100;
 
-    /**
-     * Context dla ktorego wykonywana jest operacja.
-     */
-    private FlowContext currFlowContext;
-
-    private boolean loopEnd = false;
-
     private int loop = 0;
 
     public SynchFlowWorker(Emitter emitter, String name, int index) {
@@ -47,62 +43,12 @@
     }
 
     @Override
-    protected void flowStateChanged(FlowContext context, int oldState) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("flowStateChanged {},{}", context == currFlowContext, contextStateToString(context.state()));
-        }
-
-        if (context == currFlowContext) {
-            if (context.state() == FlowContext.STATE_CONNECTED
-                    || context.state() == FlowContext.STATE_ERROR
-                    || context.state() == FlowContext.STATE_DISCONNECTED) {
-                currFlowContext = null;
-            }
-        }
-    }
-
-    @Override
     public void handle(Event event) {
         Event newEvent = eventInstanceForWorker(event);
-        synchronized (lock) {
-            try {
-                eventsQueue.put(newEvent);
-            } catch (Exception e) {
-                logger.debug("Unable to add event to queue. " + e.getMessage(), e);
-            }
-
-            lock.notifyAll();
-        }
-    }
-
-    @Override
-    protected void closeAllConnections() {
-        synchronized (lock) {
-            boolean wait;
-            do {
-                wait = false;
-                for (FlowContext flowContext : sessions.values()) {
-                    if (flowContext.isEventSent()) {
-                        wait = true;
-                        break;
-                    }
-                }
-
-                if (wait) {
-                    try {
-                        lock.wait(100);
-                    } catch (Exception e) {
-                    }
-                }
-            } while (wait);
-
-            super.closeAllConnections();
-            while (!sessions.isEmpty()) {
-                try {
-                    lock.wait(100);
-                } catch (Exception e) {
-                }
-            }
+        try {
+            eventsQueue.put(newEvent);
+        } catch (Exception e) {
+            logger.debug("Unable to add event to queue. ", e);
         }
     }
 
@@ -111,164 +57,84 @@
      *
      * @return boolean
      */
-    private boolean pollNext() {
-        Event event = eventsQueue.poll();
-        if (event != null) {
-            sleep(event);
-            if (logger.isTraceEnabled()) {
-                logger.trace("Event processing: {}", event);
+    private void process(Event event) {
+        sleep(event);
+        if (logger.isTraceEnabled()) {
+            logger.trace("Event processing: {}", event);
+        }
+
+        if (event instanceof SessionEvent) {
+            SessionEvent sessEvent = (SessionEvent) event;
+            if (isBlockedSession(sessEvent.getSessionInfo())) {
+                return;
             }
 
-            if (event instanceof SessionEvent) {
-                SessionEvent sessEvent = (SessionEvent) event;
-                if (isBlockedSession(sessEvent.getSessionInfo())) {
-                    return true;
-                }
+            if (event.getType() == SessionStatusEvent.TYPE) {
+                SessionStatusEvent statusEvent = (SessionStatusEvent) sessEvent;
+                processFlowSessionStatusEvent(statusEvent, true);
+                return;
+            } else if (event.getType() == SessionPayloadEvent.TYPE) {
+                FlowContext flowContext = flowContext(sessEvent);
 
-                if (event.getType() == SessionStatusEvent.TYPE) {
-                    SessionStatusEvent statusEvent = (SessionStatusEvent) sessEvent;
-                    if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
-                        try {
-                            currFlowContext = register(statusEvent);
-                            if (currFlowContext != null) {
-                                currFlowContext.loop(loop);
-                                emitter.connect(statusEvent.getSessionInfo(), this, index);
-                            }
-                        } catch (Exception e) {
-                            logger.error(e.getMessage(), e);
+                if (flowContext != null) {
+                    if (flowContext.blocked) {
+                        return;
+                    } else if (mayReconnect(flowContext)) {
+                        if (logger.isDebugEnabled()) {
+                            debug(flowContext, "Reconnecting.");
                         }
 
-                        return (currFlowContext == null);
-                    } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
-                        currFlowContext = flowContext((SessionEvent) event);
-                        if (currFlowContext != null) {
-                            if (!currFlowContext.isEventSent()) {
-                                disconnect(statusEvent);
-                            }
-                        }
-                    }
-
-                    return true;
-                } else if (event.getType() == SessionPayloadEvent.TYPE) {
-                    FlowContext flowContext = flowContext(sessEvent);
-                    if (flowContext.blocked) {
-                        return true;
+                        connect(flowContext, true);
                     }
-
-                    if (flowContext != null) {
-                        switch (flowContext.state()) {
-                            case FlowContext.STATE_CONNECTED:
-                            case FlowContext.STATE_ERROR:
-                                currFlowContext = flowContext;
-                                if (send(flowContext, (SessionPayloadEvent) event)) {
-                                    return false;
-                                } else {
-                                    currFlowContext = null;
-                                    return true;
-                                }
-                            case FlowContext.STATE_DISCONNECTING:
-                            case FlowContext.STATE_DISCONNECTED:
-                                if (connectPartialSession) {
-                                    currFlowContext = register(sessEvent);
-                                    if (currFlowContext != null) {
-                                        try {
-                                            currFlowContext.loop(loop);
-                                            emitter.connect(sessEvent.getSessionInfo(), this, index);
-                                        } catch (IOException e) {
-                                            logger.error(e.getMessage(), e);
-                                            currFlowContext = null;
-                                        }
-                                    }
-
-                                    return false;
-                                } else {
-                                    return true;
-                                }
-                            default:
-                                return false;
-                        }
-                    } else if (connectPartialSession) {
-                        currFlowContext = register(sessEvent);
-                        if (currFlowContext != null) {
-                            try {
-                                currFlowContext.loop(loop);
-                                emitter.connect(sessEvent.getSessionInfo(), this, index);
-                                eventsQueue.addFirst(sessEvent);
-                            } catch (IOException e) {
-                                logger.error(e.getMessage(), e);
-                                currFlowContext = null;
-                            }
-
-                            return false;
-                        } else {
-                            return true;
-                        }
-                    }
-
-                    return true;
-                } else {
-                    return true;
-                }
-            } else if (event.getType() == DataEvents.DataLoopEnd.TYPE) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("DataLoopEnd received.");
+                } else if (connectPartialSession) {
+                    flowContext = registerAndConnect(sessEvent.getSessionInfo(), true);
                 }
 
-                loopEnd = true;
-                closeAllConnections();
-                filterChain.reset();
-                if (currFlowContext != null) {
-                    loop = currFlowContext.loop() + 1;
+                if (flowContext.state == STATE_CONNECTED) {
+                    send(flowContext, (SessionPayloadEvent) event, true);
                 }
+            }
+        } else if (event.getType() == DataEvents.DataLoopEnd.TYPE) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("DataLoopEnd received.");
+            }
 
-                loopEnd = false;
-                return true;
-            } else if (event.getType() == DataEvents.DataEnd.TYPE) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("DataEnd received. Deactivation.");
-                }
+            closeAllConnections();
+            filterChain.reset();
+        } else if (event.getType() == DataEvents.DataEnd.TYPE) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("DataEnd received. Deactivation.");
+            }
 
-                working = false;
-            }
+            working = false;
         }
-
-        return false;
     }
 
     @Override
     public void disconnect() {
-        synchronized (lock) {
-            eventsQueue.clear();
-            super.disconnect();
-            lock.notifyAll();
-        }
+        super.disconnect();
+        eventsQueue.clear();
     }
 
     @Override
     public void run() {
-        synchronized (lock) {
-            working = true;
-            while (working) {
-                try {
-                    try {
-                        lock.wait(eventsQueueWaitTime);
-                    } catch (InterruptedException ignore) {
-                    }
+        working = true;
+        while (working) {
+            Event event = null;
+            try {
+                event = eventsQueue.take();
+            } catch (InterruptedException ignore) {
+            }
 
-                    boolean nextPoll;
-                    do {
-                        if (loopEnd || !working) {
-                            break;
-                        }
-
-                        nextPoll = pollNext();
-                    } while (nextPoll);
-                } catch (Exception e) {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug(e.getMessage(), e);
-                    }
+            try {
+                process(event);
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug(e.getMessage(), e);
                 }
             }
         }
     }
+
+
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/Timeouts.java	Thu Apr 02 14:02:38 2020 +0200
@@ -0,0 +1,45 @@
+package com.passus.st.client;
+
+public class Timeouts {
+
+    public static final long DEFAULT_TIMEOUT = 10_000L;
+    public static final long DEFAULT_CONNECTING_TIMEOUT = 10_000L;
+    public static final long DEFAULT_DISCONNECTING = 10_000L;
+
+    private long defaultTimeout = DEFAULT_TIMEOUT;
+    private long connectingTimeout = DEFAULT_CONNECTING_TIMEOUT;
+    private long disconnectingTimeout = DEFAULT_DISCONNECTING;
+
+    public Timeouts() {
+    }
+
+    public Timeouts(Timeouts timeouts) {
+        this.defaultTimeout = defaultTimeout;
+        this.connectingTimeout = connectingTimeout;
+        this.disconnectingTimeout = disconnectingTimeout;
+    }
+
+    public long getDefaultTimeout() {
+        return defaultTimeout;
+    }
+
+    public void setDefaultTimeout(long defaultTimeout) {
+        this.defaultTimeout = defaultTimeout;
+    }
+
+    public long getConnectingTimeout() {
+        return connectingTimeout;
+    }
+
+    public void setConnectingTimeout(long connectingTimeout) {
+        this.connectingTimeout = connectingTimeout;
+    }
+
+    public long getDisconnectingTimeout() {
+        return disconnectingTimeout;
+    }
+
+    public void setDisconnectingTimeout(long disconnectingTimeout) {
+        this.disconnectingTimeout = disconnectingTimeout;
+    }
+}
--- a/stress-tester/src/test/java/com/passus/st/client/FlowExecutorTest.java	Thu Apr 02 10:40:41 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/FlowExecutorTest.java	Thu Apr 02 14:02:38 2020 +0200
@@ -2,6 +2,7 @@
 
 import com.passus.commons.service.ServiceUtils;
 import com.passus.st.AbstractWireMockTest;
+import com.passus.st.Log4jConfigurationFactory;
 import com.passus.st.emitter.RuleBasedSessionMapper;
 import com.passus.st.emitter.nio.NioEmitter;
 import com.passus.st.utils.EventUtils;
@@ -19,7 +20,6 @@
 
 public class FlowExecutorTest extends AbstractWireMockTest {
 
-
     private NioEmitter prepareEmitter(String mapperRule) throws Exception {
         RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper();
         sessionMapper.addRule(mapperRule);
@@ -61,7 +61,7 @@
             flowExecutor.handle(new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED));
             events.forEach(flowExecutor::handle);
 
-            flowExecutor.join();
+            flowExecutor.join(2_000);
             assertTrue(listener.size() > 0);
             assertTrue(listener.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent);
             TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listener.get(0);
@@ -94,7 +94,7 @@
 
             events.forEach(flowExecutor::handle);
 
-            flowExecutor.join();
+            flowExecutor.join(2_000);
 
             assertTrue(listener.size() > 0);
             assertTrue(listener.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent);
--- a/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java	Thu Apr 02 10:40:41 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java	Thu Apr 02 14:02:38 2020 +0200
@@ -126,13 +126,20 @@
     @Test
     public void testDisconnect() throws Exception {
         TestFlowWorker worker = worker();
-        makeConnected(worker, session);
+        ChannelContext channelContext = makeConnected(worker, session);
+        FlowContext flowContext = worker.flowContext(session);
+        when(channelContext.getAttachment()).thenReturn(flowContext);
 
-        FlowContext flowContext = worker.flowContext(session);
+        doAnswer((Answer<Void>) invocation -> {
+                    worker.channelInactive(channelContext);
+                    return null;
+                }
+        ).when(channelContext).close();
         worker.disconnect(session);
 
         assertEquals(FlowContext.STATE_DISCONNECTED, flowContext.state);
         assertEquals(null, flowContext.buffer);
+        assertNull(flowContext.error);
         assertNull(worker.flowContext(session));
     }
 
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Thu Apr 02 10:40:41 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Thu Apr 02 14:02:38 2020 +0200
@@ -27,7 +27,6 @@
 
     public static final long JOIN_TIMEOUT = 1_000;
 
-
     private final TestHttpClientListener listener = new TestHttpClientListener();
 
     private static class LocalEmitter implements Emitter {
@@ -262,6 +261,11 @@
     }
 
     @Test
+    public void testConnectSuccess() {
+
+    }
+
+    @Test
     public void testHandle_HTTP_SimpleRequestResponse() throws Exception {
         List<Event> events = readDefaultEvents();
         SynchFlowWorker worker = createWorker();
--- a/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java	Thu Apr 02 10:40:41 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java	Thu Apr 02 14:02:38 2020 +0200
@@ -4,6 +4,7 @@
 import com.passus.commons.utils.ArrayUtils;
 import com.passus.net.netflow.Netflow9;
 import com.passus.net.netflow.Netflow9Decoder;
+import com.passus.st.Log4jConfigurationFactory;
 import com.passus.st.Protocols;
 import com.passus.st.client.Event;
 import com.passus.st.client.FlowExecutor;