changeset 1035:46067bb4f3ce

ParallelFlowWorker in progress
author Devel 2
date Tue, 07 Apr 2020 13:34:22 +0200
parents 51ef66fadb5c
children 692f56665b7a
files stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactory.java stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java stress-tester/src/main/java/com/passus/st/client/FlowProcessorSupervisor.java stress-tester/src/main/java/com/passus/st/client/FlowWorker.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/client/NullFlowWorker.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/SessionStatusEvent.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/test/java/com/passus/st/client/AbstractFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java
diffstat 15 files changed, 705 insertions(+), 671 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java	Mon Apr 06 14:13:39 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java	Tue Apr 07 13:34:22 2020 +0200
@@ -272,7 +272,7 @@
         }
 
         for (FlowWorker worker : workers) {
-            worker.disconnect();
+            worker.disconnectAll();
             worker.interrupt();
 
             try {
@@ -285,9 +285,9 @@
         started = false;
     }
 
-    public void closeAllConnections() {
+    public void disconnectAll() {
         for (FlowWorker worker : workers) {
-            worker.disconnect();
+            worker.disconnectAll();
         }
     }
 
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactory.java	Mon Apr 06 14:13:39 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactory.java	Tue Apr 07 13:34:22 2020 +0200
@@ -4,4 +4,6 @@
 
     FlowHandler create(int protocolId);
 
+    FlowHandler create(Object message);
+
 }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java	Mon Apr 06 14:13:39 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java	Tue Apr 07 13:34:22 2020 +0200
@@ -1,5 +1,10 @@
 package com.passus.st.client;
 
+import com.passus.net.dns.DnsRecord;
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpResponse;
+import com.passus.net.netflow.Netflow;
+import com.passus.net.pgsql.PgSqlMessage;
 import com.passus.st.client.dns.DnsFlowHandler;
 import com.passus.st.client.http.HttpFlowHandler;
 import com.passus.st.client.netflow.NetflowFlowHandler;
@@ -25,4 +30,23 @@
 
         throw new IllegalArgumentException("Not supported protocol '" + protocolId + "'.");
     }
+
+    @Override
+    public FlowHandler create(Object message) {
+        if (message == null) {
+            throw new NullPointerException();
+        }
+
+        if (message instanceof HttpRequest || message instanceof HttpResponse) {
+            return new HttpFlowHandler();
+        } else if (message instanceof DnsRecord) {
+            return new DnsFlowHandler();
+        } else if (message instanceof Netflow) {
+            return new NetflowFlowHandler();
+        } else if (message instanceof PgSqlMessage) {
+            return new PgSqlFlowHandlerNetflowFlowHandler();
+        }
+
+        throw new IllegalArgumentException("Not supported class '" + message.getClass() + "'.");
+    }
 }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Mon Apr 06 14:13:39 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Tue Apr 07 13:34:22 2020 +0200
@@ -314,6 +314,10 @@
                     }
 
                     Object resp = decoder.getResult();
+                    if (logger.isDebugEnabled()) {
+                        debug(logger, flowContext, "Response decoded. Response class " + resp.getClass().getSimpleName() + ".");
+                    }
+
                     Object req = null;
                     if (flowContext.sentEvent() != null) {
                         req = flowContext.sentEvent().getRequest();
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessorSupervisor.java	Mon Apr 06 14:13:39 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessorSupervisor.java	Tue Apr 07 13:34:22 2020 +0200
@@ -31,16 +31,28 @@
 
     FlowMetric getFlowMetric();
 
-    void onConnected(FlowContext flowContext);
-
-    void onRequestSent(FlowContext flowContext, SessionPayloadEvent event);
+    default void onConnected(FlowContext flowContext) {
 
-    void onResponseReceived(FlowContext flowContext, Object response);
+    }
 
-    void onDisconnecting(FlowContext flowContext);
+    default void onRequestSent(FlowContext flowContext, SessionPayloadEvent event) {
 
-    void onDisconnected(FlowContext flowContext);
+    }
 
-    void onError(FlowContext flowContext);
+    default void onResponseReceived(FlowContext flowContext, Object response) {
+
+    }
+
+    default void onDisconnecting(FlowContext flowContext) {
+
+    }
+
+    default void onDisconnected(FlowContext flowContext) {
+
+    }
+
+    default void onError(FlowContext flowContext) {
+
+    }
 
 }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java	Mon Apr 06 14:13:39 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java	Tue Apr 07 13:34:22 2020 +0200
@@ -186,9 +186,17 @@
 
     public abstract int activeConnections();
 
-    public abstract void disconnect();
+    public void disconnectAll() {
+        disconnectAll(true);
+    }
 
-    public abstract void disconnect(SessionInfo session);
+    public abstract void disconnectAll(boolean wait);
+
+    public void disconnect(SessionInfo session) {
+        disconnect(session, true);
+    }
+
+    public abstract void disconnect(SessionInfo session, boolean wait);
 
     public abstract void handle(Event event);
 
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Mon Apr 06 14:13:39 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Tue Apr 07 13:34:22 2020 +0200
@@ -222,7 +222,12 @@
     }
 
     @Override
-    public void disconnect() {
+    public void disconnect(SessionInfo session, boolean wait) {
+
+    }
+
+    @Override
+    public void disconnectAll(boolean wait) {
         for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) {
             FlowContext flowContext = entry.getValue();
             try {
--- a/stress-tester/src/main/java/com/passus/st/client/NullFlowWorker.java	Mon Apr 06 14:13:39 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/NullFlowWorker.java	Tue Apr 07 13:34:22 2020 +0200
@@ -20,12 +20,12 @@
     }
 
     @Override
-    public void disconnect() {
+    public void disconnectAll(boolean wait) {
 
     }
 
     @Override
-    public void disconnect(SessionInfo session) {
+    public void disconnect(SessionInfo session, boolean wait) {
 
     }
 
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Mon Apr 06 14:13:39 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Tue Apr 07 13:34:22 2020 +0200
@@ -5,38 +5,39 @@
 import com.passus.st.emitter.ChannelContext;
 import com.passus.st.emitter.Emitter;
 import com.passus.st.emitter.SessionInfo;
+import com.passus.st.filter.FlowFilterChain;
 import com.passus.st.plugin.PluginConstants;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.LinkedList;
-import java.util.Queue;
+import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.Semaphore;
 
-import static com.passus.st.client.FlowContext.*;
-import static com.passus.st.client.FlowUtils.waitOpFinished;
+import static com.passus.st.client.FlowContext.STATE_CONNECTED;
+import static com.passus.st.client.FlowUtils.*;
 
 @Plugin(name = ParallelFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER)
-public class ParallelFlowWorker  {
+public class ParallelFlowWorker extends FlowWorker {
 
-   public static final String TYPE = "parallel";
+    public static final String TYPE = "parallel";
 
-     /*public static final int DEFAULT_MAX_SENT_REQUESTS = 10;
+    public static final int DEFAULT_MAX_SENT_REQUESTS = 10;
+    public static final int DEFAULT_FLOW_EVENTS_QUEUE = 100;
 
     private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>();
 
+    protected final Map<SessionInfo, FlowThread> sessions = new ConcurrentHashMap<>();
+
     private int maxSentRequests = DEFAULT_MAX_SENT_REQUESTS;
 
     private long eventsQueueWaitTime = 100;
 
-    private final Semaphore semaphore = new Semaphore(maxSentRequests);
+    private int flowEventsQueue = DEFAULT_FLOW_EVENTS_QUEUE;
 
-    private final Deque<LocalFlowContext> flowIndex = new ArrayDeque<>();
-
-    private boolean closeAllConnections = false;
+    private volatile boolean working;
 
     public ParallelFlowWorker(Emitter emitter, String name, int index) {
         super(emitter, name, index);
@@ -51,20 +52,6 @@
         this.maxSentRequests = maxSentRequests;
     }
 
-    @Override
-    protected LocalFlowContext flowContext(SessionInfo session) {
-        return (LocalFlowContext) super.flowContext(session);
-    }
-
-    @Override
-    protected LocalFlowContext flowContext(ChannelContext context) {
-        return flowContext(context.getSessionInfo());
-    }
-
-    @Override
-    protected LocalFlowContext flowContext(SessionEvent event) {
-        return flowContext(event.getSessionInfo());
-    }
 
     public long getEventsQueueWaitTime() {
         return eventsQueueWaitTime;
@@ -75,180 +62,141 @@
         this.eventsQueueWaitTime = eventsQueueWaitTime;
     }
 
-    protected void removeFlowContext(FlowContext flowContext) {
-        if (flowContext != null) {
-            flowIndex.remove(flowContext);
-        }
+    @Override
+    public boolean isWorking() {
+        return working;
     }
 
     @Override
-    protected LocalFlowContext createFlowContext(SessionInfo session) {
-        LocalFlowContext flowContext = new LocalFlowContext(session);
-        flowIndex.add(flowContext);
-        return flowContext;
+    public int activeConnections() {
+        return 0;
     }
 
-    private void waitCloseAllConnections() {
-        *//*closeAllConnections = true;
-        synchronized (lock) {
-            while (!flowIndex.isEmpty()) {
-                try {
-                    lock.wait(10);
-                } catch (InterruptedException ignore) {
-                }
-            }
-        }
-
-        closeAllConnections = false;*//*
-        throw new RuntimeException("Not implemented.");
-    }
-
-*//*    @Override
-    protected boolean send(FlowContext flowContext, SessionPayloadEvent event, boolean wait) {
-        //Sprawdzamy, czy polaczen nie jest za duzo. Jezeli jest, to zamykamy
-        //najmniej uzywane.
-       if (flowIndex.size() > maxSentRequests) {
-            int diff = flowIndex.size() - maxSentRequests;
-            if (logger.isDebugEnabled()) {
-                debug(flowContext, "Too many connections {}.", flowIndex.size());
-            }
-
-            Iterator<LocalFlowContext> it = flowIndex.descendingIterator();
-            while (it.hasNext()) {
-                LocalFlowContext indexFlowContext = it.next();
-                if (indexFlowContext.eventsQueue.isEmpty()
-                        && !indexFlowContext.isEventSent()) {
-                    disconnect(flowContext);
-                    if (--diff == 0) {
-                        break;
-                    }
+    @Override
+    public void disconnectAll(boolean wait) {
+        if (wait) {
+            for (; ; ) {
+                int size = eventsQueue.size();
+                if (size == 0
+                        || (size == 1 && eventsQueue.peek().getType() == DataEvents.DataEnd.TYPE)) {
+                    break;
                 }
             }
         }
 
-        return super.send(flowContext, event);
-
+        sessions.forEach((k, flowThread) -> {
+            flowThread.disconnect(wait);
 
-    }*//*
+            flowThread.working = false;
+            flowThread.interrupt();
+            try {
+                flowThread.join();
+            } catch (InterruptedException ignore) {
 
-    private boolean canSend(FlowContext flowContext) {
-        return flowContext.state() == FlowContext.STATE_CONNECTED && !flowContext.isEventSent();
+            }
+        });
+
     }
 
     @Override
-    protected void flowStateChanged(FlowContext flowContext, int oldState) {
-        LocalFlowContext localFlowContext = (LocalFlowContext) flowContext;
-        *//*if (oldState == FlowContext.STATE_REQ_SENT) {
-            if (semaphore.availablePermits() <= maxSentRequests) {
-                semaphore.release();
-            }
-        }*//*
+    public void disconnect(SessionInfo session, boolean wait) {
+        FlowThread flowThread = flowThread(session);
+        if (flowThread != null) {
+            flowThread.disconnect(wait);
+        }
+    }
 
-        if (closeAllConnections) {
-            if (localFlowContext.state() < FlowContext.STATE_DISCONNECTING
-                    && !localFlowContext.isEventSent()
-                    && localFlowContext.eventsQueue.isEmpty()) {
-                disconnect(flowContext);
-                return;
+    private FlowThread register(SessionInfo session) {
+        if (sessions.containsKey(session)) {
+            logger.warn("Unable to register session '" + session + "'. Session already registered.");
+            return null;
+        }
+
+        FlowContext flowContext = new FlowContext(session);
+        flowContext.createLock();
+        FlowHandler client = clientFactory.create(session.getProtocolId());
+        client.init(flowContext);
+        flowContext.client(client);
+
+        FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue);
+        flowThread.start();
+        sessions.put(session, flowThread);
+        return flowThread;
+    }
+
+    protected FlowThread flowThread(SessionEvent event) {
+        return flowThread(event.getSessionInfo());
+    }
+
+    protected FlowThread flowThread(ChannelContext context) {
+        return flowThread(context.getSessionInfo());
+    }
+
+    protected FlowThread flowThread(SessionInfo session) {
+        FlowThread flowThread = sessions.get(session);
+        if (flowThread == null) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Context for session '" + session + "' not found.");
             }
         }
 
-        if (localFlowContext.state() >= FlowContext.STATE_CONNECTED
-                && localFlowContext.state() < FlowContext.STATE_DISCONNECTING
-                && !localFlowContext.isEventSent()
-                && !localFlowContext.eventsQueue.isEmpty()) {
+        return flowThread;
+    }
 
-            Event event = localFlowContext.eventsQueue.peek();
-            if (event.getType() == SessionStatusEvent.TYPE) {
-                SessionStatusEvent statusEvent = (SessionStatusEvent) event;
-                if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
-                    localFlowContext.eventsQueue.poll();
-                    disconnect((SessionStatusEvent) event);
-                }
-            } else if (event.getType() == SessionPayloadEvent.TYPE
-                    && canSend(flowContext)) {
-                localFlowContext.eventsQueue.poll();
-                send(flowContext, (SessionPayloadEvent) event, true);
-            } else {
-                localFlowContext.eventsQueue.poll();
+    private void waitNoSessions() {
+        while (!sessions.isEmpty()) {
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException ignore) {
+
             }
         }
     }
 
-    private void makeFirst(LocalFlowContext flowContext) {
-        flowIndex.remove(flowContext);
-        flowIndex.addFirst(flowContext);
-    }
-
-    private void addToQueue(LocalFlowContext flowContext, Event event) {
-        flowContext.eventsQueue.add(event);
-        makeFirst(flowContext);
-    }
-
     @Override
     public void handle(Event event) {
-        Event newEvent = null;
-        switch (event.getType()) {
-            case SessionPayloadEvent.TYPE:
-                semaphore.acquireUninterruptibly();
-                newEvent = eventInstanceForWorker(event);
-                break;
-            case SessionStatusEvent.TYPE:
-            case DataEvents.DataLoopEnd.TYPE:
-            case DataEvents.DataEnd.TYPE:
-                newEvent = event;
-        }
-
-        if (newEvent != null) {
-            try {
-                eventsQueue.put(newEvent);
-            } catch (Exception e) {
-                logger.debug("Unable to add event to queue. " + e.getMessage(), e);
-            }
+        try {
+            Event newEvent = eventInstanceForWorker(event, index);
+            eventsQueue.put(newEvent);
+        } catch (Exception e) {
+            logger.debug("Unable to add event to queue. " + e.getMessage(), e);
         }
     }
 
     private void processEvent(Event event) {
+        if (trace) {
+            logger.trace("Event processing: {}", event);
+        }
+
         if (event instanceof SessionEvent) {
             switch (event.getType()) {
-                case SessionStatusEvent.TYPE:
+                case SessionStatusEvent.TYPE: {
                     SessionStatusEvent statusEvent = (SessionStatusEvent) event;
-                    if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
-                        LocalFlowContext flowContext = flowContext((SessionEvent) event);
-                        if (flowContext != null) {
-                            if (flowContext.eventsQueue.isEmpty()
-                                    && !flowContext.isEventSent()) {
-                                disconnect(statusEvent);
-                            } else {
-                                addToQueue(flowContext, event);
-                            }
+                    FlowThread flowThread = flowThread(statusEvent);
+                    if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
+                        if (flowThread == null) {
+                            flowThread = register(statusEvent.getSessionInfo());
                         }
                     }
+
+                    if (flowThread != null) {
+                        flowThread.handle(event);
+                    }
+
                     break;
+                }
                 case SessionPayloadEvent.TYPE: {
                     SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event;
-                    LocalFlowContext flowContext = flowContext(payloadEvent);
-                    if (flowContext != null) {
-                        if (flowContext.state() >= FlowContext.STATE_CONNECTING
-                                && flowContext.state() < FlowContext.STATE_DISCONNECTING) {
-                            if (flowContext.eventsQueue.isEmpty()
-                                    && (flowContext.state() == FlowContext.STATE_CONNECTED
-                                    || flowContext.state() == FlowContext.STATE_ERROR
-                                    || flowContext.isEventSent())) {
-                                send(flowContext, payloadEvent, true);
-                            } else {
-                                addToQueue(flowContext, event);
-                            }
-                        }
-                    } else {
-                        try {
-                            SessionInfo session = payloadEvent.getSessionInfo();
-                            flowContext = (LocalFlowContext) register(session);
-                            addToQueue(flowContext, event);
-                            emitter.connect(session, this, index);
-                        } catch (Exception e) {
-                            logger.error(e.getMessage(), e);
-                        }
+                    FlowThread flowThread = flowThread(payloadEvent);
+
+                    if (connectPartialSession) {
+                        SessionInfo sessionInfo = payloadEvent.getSessionInfo();
+                        flowThread = register(sessionInfo);
+                        flowThread.handle(SessionStatusEvent.establishedEvent(sessionInfo));
+                    }
+
+                    if (flowThread != null) {
+                        flowThread.handle(event);
                     }
 
                     break;
@@ -260,7 +208,11 @@
                 logger.debug("DataLoopEnd received.");
             }
 
-            waitCloseAllConnections();
+            sessions.forEach((session, flowThread) -> {
+                flowThread.handle(event);
+            });
+
+            waitNoSessions();
             filterChain.reset();
         } else if (event.getType() == DataEvents.DataEnd.TYPE) {
             if (logger.isDebugEnabled()) {
@@ -289,110 +241,179 @@
         }
     }
 
-    protected class FlowThread extends Thread {
+    class FlowThread extends Thread implements FlowProcessorSupervisor {
+
+        private final Logger logger = LogManager.getLogger(FlowThread.class);
+
+        private volatile boolean working;
+
+        private final Emitter emitter;
 
         private final FlowContext flowContext;
 
-        private final Timeouts timeouts;
-
-        private final BlockingQueue<Event> events;
+        private final FlowProcessor flowProcessor;
 
-        public FlowThread(FlowContext flowContext, Timeouts timeouts, int queueSize) {
+        private final BlockingQueue<Event> queue;
+
+        private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize) {
+            this.emitter = emitter;
             this.flowContext = flowContext;
-            this.timeouts = timeouts;
-            this.events = new ArrayBlockingQueue<>(queueSize);
+            flowProcessor = new FlowProcessor(this, logger, index);
+            this.queue = new ArrayBlockingQueue<>(queueSize);
         }
 
-        protected void connect(FlowContext flowContext, boolean wait) {
-            flowContext.lock();
+        public FlowContext flowContext() {
+            return flowContext;
+        }
+
+        @Override
+        public Emitter getEmitter() {
+            return emitter;
+        }
+
+        @Override
+        public Timeouts getTimeouts() {
+            return timeouts;
+        }
+
+        @Override
+        public FlowFilterChain getFilterChain() {
+            return filterChain;
+        }
+
+        @Override
+        public int getMaxEncoderErrors() {
+            return maxEncoderErrors;
+        }
+
+        @Override
+        public int getMaxConnectionAttempts() {
+            return maxConnectionAttempts;
+        }
+
+        @Override
+        public int getMaxSendErrors() {
+            return maxSendErrors;
+        }
+
+        @Override
+        public long getReconnectDelay() {
+            return reconnectDelay;
+        }
+
+        @Override
+        public FlowContext flowContext(SessionInfo session) {
+            return flowContext;
+        }
+
+        @Override
+        public boolean isCollectMetrics() {
+            return false;
+        }
+
+        @Override
+        public FlowMetric getFlowMetric() {
+            return null;
+        }
+
+        @Override
+        public void onResponseReceived(FlowContext flowContext, Object response) {
+            fireResponseReceived(flowContext.sentEvent.getRequest(), response, flowContext);
+        }
+
+        public void disconnect(boolean wait) {
+            if (wait) {
+                while (!queue.isEmpty()) {
+                    try {
+                        Thread.sleep(10);
+                    } catch (InterruptedException ignore) {
+
+                    }
+                }
+            }
+
+            flowProcessor.disconnect(flowContext, wait);
+            working = false;
+            interrupt();
+
             try {
-                flowContext.connectionAttempts++;
-                flowContext.state = STATE_CONNECTING;
-                emitter.connect(flowContext.session, this, index);
-                if (wait) {
-                    waitOpFinished(flowContext, STATE_CONNECTED);
-                }
-            } catch (Exception ex) {
-                error(flowContext, ex);
-            } finally {
-                flowContext.signalAndUnlock();
+                join();
+            } catch (InterruptedException e) {
+
             }
         }
 
-        protected void disconnect(FlowContext flowContext) {
-            disconnect(flowContext, true);
-        }
-
-        protected void disconnect(FlowContext flowContext, boolean wait) {
-            if (logger.isDebugEnabled()) {
-                debug(flowContext, "Disconnect.");
-            }
-
-            long now = timeGenerator.currentTimeMillis();
-            flowContext.lock();
+        public void handle(Event event) {
             try {
-                if (trace) {
-                    debug(flowContext, "Disconnecting.");
-                }
-
-                if (flowContext.state == STATE_DISCONNECTING
-                        || flowContext.state == STATE_DISCONNECTED) {
-                    return;
-                }
-
-                flowContext.state = STATE_DISCONNECTING;
-                flowContext.timeout = now + timeouts.getDisconnectingTimeout();
-
-                try {
-                    onDisconnecting(flowContext);
-                } catch (Exception e) {
-                    if (logger.isDebugEnabled()) {
-                        debug(flowContext, "Error occurred during onDisconnecting calling.", e);
-                    }
-                }
-
-                if (flowContext.channelContext() != null) {
-                    try {
-                        flowContext.channelContext().close();
-                    } catch (Exception e) {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug(e.getMessage(), e);
-                        }
-                    }
-                }
-
-                if (wait) {
-                    waitOpFinished(flowContext, STATE_DISCONNECTED, timeouts.getDisconnectingTimeout());
-                }
-            } catch (InterruptedException e) {
-                error(flowContext, e);
-            } finally {
-                flowContext.signalAndUnlock();
+                queue.put(event);
+            } catch (Exception e) {
+                debug(logger, flowContext, "Unable to add event to queue. ", e);
             }
         }
 
-        protected void error(FlowContext flowContext, Throwable cause) {
-            error(flowContext, FlowError.interpret(cause));
+        private void finish() {
+            flowProcessor.disconnect(flowContext, true);
+            working = false;
+
         }
 
-        protected void error(FlowContext flowContext, FlowError error) {
-            if (flowContext.state >= STATE_CONNECTED && flowContext.state < STATE_DISCONNECTING) {
-                disconnect(flowContext, false);
+        @Override
+        public void run() {
+            working = true;
+            if (trace) {
+                debug(logger, flowContext, "Flow thread started.");
             }
 
-            flowContext.error(error);
+            while (working) {
+                Event event = null;
+                try {
+                    event = queue.take();
+                } catch (InterruptedException ignore) {
+
+                }
+
+                if (event != null) {
+                    if (trace) {
+                        trace(logger, flowContext, "Event processing: {}", event);
+                    }
+
+                    if (event.getType() == SessionStatusEvent.TYPE) {
+                        SessionStatusEvent statusEvent = (SessionStatusEvent) event;
+                        if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
+                            flowProcessor.connect(flowContext, true);
+                        } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
+                            finish();
+                        }
+                    } else if (event.getType() == SessionPayloadEvent.TYPE) {
+                        if (flowContext.blocked) {
+                            continue;
+                        } else if (mayReconnect(flowContext)) {
+                            if (logger.isDebugEnabled()) {
+                                debug(logger, flowContext, "Reconnecting.");
+                            }
+
+                            flowProcessor.connect(flowContext, true);
+                        }
+
+                        if (flowContext.state == STATE_CONNECTED) {
+                            flowProcessor.send(flowContext, (SessionPayloadEvent) event, true);
+                        }
+                    } else if (event.getType() == DataEvents.DataLoopEnd.TYPE) {
+                        if (logger.isDebugEnabled()) {
+                            debug(logger, flowContext, "DataLoopEnd received.");
+                        }
+
+                        finish();
+                    }
+                }
+            }
+
+            if (trace) {
+                debug(logger, flowContext, "Flow thread stopped.");
+            }
+
+            sessions.remove(flowContext.session);
         }
     }
 
-    protected class LocalFlowContext extends FlowContext {
-
-        private final Queue<Event> eventsQueue;
-
-        private LocalFlowContext(SessionInfo session) {
-            super(session);
-            eventsQueue = new LinkedList<>();
-        }
-
-    }*/
-
 }
--- a/stress-tester/src/main/java/com/passus/st/client/SessionStatusEvent.java	Mon Apr 06 14:13:39 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SessionStatusEvent.java	Tue Apr 07 13:34:22 2020 +0200
@@ -3,7 +3,6 @@
 import com.passus.st.emitter.SessionInfo;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public class SessionStatusEvent extends SessionEvent {
@@ -68,6 +67,14 @@
                 + '}';
     }
 
+    public static SessionStatusEvent openingEvent(SessionInfo sessionInfo) {
+        return new SessionStatusEvent(sessionInfo, STATUS_OPENING);
+    }
+
+    public static SessionStatusEvent establishedEvent(SessionInfo sessionInfo) {
+        return new SessionStatusEvent(sessionInfo, STATUS_ESTABLISHED);
+    }
+
     @Override
     public SessionStatusEvent instanceForWorker(int index) {
         return new SessionStatusEvent(getSessionInfo(), status, getSourceName());
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Mon Apr 06 14:13:39 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Tue Apr 07 13:34:22 2020 +0200
@@ -62,12 +62,6 @@
         this.eventsQueueWaitTime = eventsQueueWaitTime;
     }
 
-    protected FlowContext createFlowContext(SessionInfo session) {
-        FlowContext flowContext = new FlowContext(session);
-        flowContext.createLock();
-        return flowContext;
-    }
-
     @Override
     public int activeConnections() {
         int count = 0;
@@ -95,8 +89,9 @@
             return null;
         }
 
-        FlowContext flowContext = createFlowContext(session);
-        //TODO Malo optymalne
+        FlowContext flowContext = new FlowContext(session);
+        flowContext.createLock();
+
         FlowHandler client = clientFactory.create(session.getProtocolId());
         client.init(flowContext);
         flowContext.client(client);
@@ -139,32 +134,12 @@
         return flowContext;
     }
 
-    protected void disconnectAllConnections(boolean wait) {
-        for (FlowContext flowContext : sessions.values()) {
-            flowProcessor.disconnect(flowContext, wait);
-        }
-    }
-
     @Override
-    public void disconnect(SessionInfo session) {
-        try {
-            FlowContext flowContext = flowContext(session);
-            if (flowContext != null) {
-                flowProcessor.disconnect(flowContext, true);
-            }
-        } catch (Exception e) {
-            if (logger.isDebugEnabled()) {
-                logger.debug(e.getMessage(), e);
-            }
-        }
-    }
-
-    @Override
-    public void disconnect() {
+    public void disconnectAll(boolean wait) {
         for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) {
             FlowContext flowContext = entry.getValue();
             try {
-                flowProcessor.disconnect(flowContext, true);
+                flowProcessor.disconnect(flowContext, wait);
             } catch (Exception e) {
                 if (logger.isDebugEnabled()) {
                     debug(logger, flowContext, e.getMessage(), e);
@@ -178,6 +153,14 @@
     }
 
     @Override
+    public void disconnect(SessionInfo session, boolean wait) {
+        FlowContext flowContext = flowContext(session);
+        if (flowContext != null) {
+            flowProcessor.disconnect(flowContext, wait);
+        }
+    }
+
+    @Override
     public void handle(Event event) {
         Event newEvent = eventInstanceForWorker(event, index);
         try {
@@ -230,7 +213,7 @@
      */
     private void process(Event event) {
         sleep(event);
-        if (logger.isTraceEnabled()) {
+        if (trace) {
             logger.trace("Event processing: {}", event);
         }
 
@@ -260,7 +243,7 @@
                     flowContext = registerAndConnect(sessEvent.getSessionInfo(), true);
                 }
 
-                if (flowContext.state == STATE_CONNECTED) {
+                if (flowContext != null && flowContext.state == STATE_CONNECTED) {
                     flowProcessor.send(flowContext, (SessionPayloadEvent) event, true);
                 }
             }
@@ -269,7 +252,7 @@
                 logger.debug("DataLoopEnd received.");
             }
 
-            disconnectAllConnections(true);
+            disconnectAll();
             filterChain.reset();
         } else if (event.getType() == DataEvents.DataEnd.TYPE) {
             if (logger.isDebugEnabled()) {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/client/AbstractFlowWorkerTest.java	Tue Apr 07 13:34:22 2020 +0200
@@ -0,0 +1,316 @@
+package com.passus.st.client;
+
+import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
+import com.passus.data.ByteBuff;
+import com.passus.data.DataEncoder;
+import com.passus.data.HeapByteBuff;
+import com.passus.net.SocketAddress;
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpResponse;
+import com.passus.net.http.HttpResponseEncoder;
+import com.passus.st.Log4jConfigurationFactory;
+import com.passus.st.client.SynchFlowWorker.SynchWrapper;
+import com.passus.st.emitter.*;
+import com.passus.st.metric.MetricsContainer;
+import com.passus.st.utils.EventUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static com.passus.st.utils.Assert.assertHttpClientEvents;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+
+public abstract class AbstractFlowWorkerTest {
+
+    static {
+        Log4jConfigurationFactory.enableFactory("trace");
+    }
+
+    //public static final long JOIN_TIMEOUT = 10_000;
+    public static final long JOIN_TIMEOUT = Long.MAX_VALUE;
+
+    protected final TestHttpClientListener listener = new TestHttpClientListener();
+
+    protected class LocalEmitter implements Emitter {
+
+        private final Logger LOGGER = LogManager.getLogger(LocalEmitter.class);
+
+        private SessionMapper sessionMapper;
+
+        private boolean started = false;
+
+        private final DataEncoder encoder;
+
+        private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        public LocalEmitter() {
+            this(HttpResponseEncoder.INSTANCE);
+        }
+
+        public LocalEmitter(DataEncoder encoder) {
+            this.encoder = encoder;
+        }
+
+        @Override
+        public void setSessionMapper(SessionMapper sessionMapper) {
+            this.sessionMapper = sessionMapper;
+        }
+
+        @Override
+        public SessionMapper getSessionMapper() {
+            return sessionMapper;
+        }
+
+        @Override
+        public void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException {
+            LocalChannelContext channelContext = new LocalChannelContext(this, handler, null, session);
+            try {
+                handler.channelRegistered(channelContext);
+                handler.channelActive(channelContext);
+            } catch (Exception ex) {
+                LOGGER.debug(ex.getMessage(), ex);
+            }
+        }
+
+        protected void flush(LocalChannelContext channelContext) {
+            FlowProcessor flowProcessor = (FlowProcessor) channelContext.handler;
+            SessionPayloadEvent event = extractSentEvent(channelContext);
+            HttpResponse response = (HttpResponse) event.getResponse();
+
+            ByteBuff buff = new HeapByteBuff();
+            encoder.encode(response, buff);
+
+            executor.execute(() -> {
+                try {
+                    flowProcessor.dataReceived(channelContext, buff);
+                } catch (Exception ex) {
+                    LOGGER.error(ex.getMessage(), ex);
+                }
+            });
+        }
+
+        protected void close(LocalChannelContext channelContext) {
+            try {
+                channelContext.handler.channelInactive(channelContext);
+                channelContext.handler.channelUnregistered(channelContext);
+            } catch (Exception ex) {
+                ex.printStackTrace();
+            }
+        }
+
+        @Override
+        public boolean isStarted() {
+            return started;
+        }
+
+        @Override
+        public void start() {
+            if (sessionMapper == null) {
+                sessionMapper = new PassThroughSessionMapper();
+            }
+
+            started = true;
+        }
+
+        @Override
+        public void stop() {
+            started = false;
+        }
+
+        @Override
+        public boolean isCollectMetrics() {
+            return false;
+        }
+
+        @Override
+        public void setCollectMetrics(boolean collectMetrics) {
+        }
+
+        @Override
+        public void writeMetrics(MetricsContainer container) {
+        }
+
+        @Override
+        public void configure(Configuration config, ConfigurationContext context) {
+        }
+    }
+
+    protected static class LocalChannelContext implements ChannelContext {
+
+        protected final LocalEmitter emitter;
+
+        protected final EmitterHandler handler;
+
+        protected final SessionInfo sessionInfo;
+
+        protected final Queue<ByteBuffer> dataQueue;
+
+        protected SocketAddress localAddress;
+
+        protected SocketAddress remoteAddress;
+
+        protected boolean bidirectional = true;
+
+        private FlowContext flowContext;
+
+        public LocalChannelContext(LocalEmitter emitter, EmitterHandler handler, SocketAddress remoteAddress, SessionInfo sessionInfo) {
+            this.emitter = emitter;
+            this.handler = handler;
+            this.remoteAddress = remoteAddress;
+            this.sessionInfo = sessionInfo;
+            this.dataQueue = new LinkedList<>();
+        }
+
+        @Override
+        public boolean isBidirectional() {
+            return bidirectional;
+        }
+
+        @Override
+        public void setBidirectional(boolean unidirectional) {
+            this.bidirectional = unidirectional;
+        }
+
+        @Override
+        public boolean isConnected() {
+            throw new UnsupportedOperationException("Not supported yet.");
+        }
+
+        @Override
+        public boolean isConnectionPending() {
+            throw new UnsupportedOperationException("Not supported yet.");
+        }
+
+        private void addToQeueu(ByteBuffer buffer) throws IOException {
+            dataQueue.add(buffer);
+        }
+
+        @Override
+        public void write(byte[] data, int offset, int length) throws IOException {
+            addToQeueu(ByteBuffer.wrap(data, offset, length));
+        }
+
+        @Override
+        public void write(ByteBuff data) throws IOException {
+            addToQeueu(data.toNioByteBuffer());
+        }
+
+        @Override
+        public void flush() throws IOException {
+            emitter.flush(this);
+        }
+
+        @Override
+        public void close() throws IOException {
+            emitter.close(this);
+        }
+
+        @Override
+        public SocketAddress getLocalAddress() {
+            return localAddress;
+        }
+
+        @Override
+        public SocketAddress getRemoteAddress() {
+            return remoteAddress;
+        }
+
+        @Override
+        public SessionInfo getSessionInfo() {
+            return sessionInfo;
+        }
+
+        @Override
+        public FlowContext getFlowContext() {
+            return flowContext;
+        }
+
+        @Override
+        public void setFlowContext(FlowContext flowContext) {
+            this.flowContext = flowContext;
+        }
+    }
+
+    protected FlowWorker createWorker() {
+        return createWorker(new LocalEmitter());
+    }
+
+    protected abstract FlowWorker createWorker(Emitter emitter);
+
+    protected abstract SessionPayloadEvent extractSentEvent(LocalChannelContext channelContext);
+
+    private List<Event> readEvents(String pcapFile) throws Exception {
+        Map<String, Object> props = new HashMap<>();
+        props.put("allowPartialSession", true);
+        props.put("ports", 4214);
+        return EventUtils.readEvents(pcapFile, props);
+    }
+
+    @AfterMethod
+    public void afterMethod() {
+        listener.clear();
+    }
+
+    private void join(FlowWorker worker) {
+        try {
+            worker.join(JOIN_TIMEOUT);
+        } catch (InterruptedException ignore) {
+
+        }
+
+        assertFalse("Worker is still working.", worker.isWorking());
+    }
+
+    private List<Event> readDefaultEvents() throws Exception {
+        List<Event> events = readEvents("pcap/http/http_req_resp.pcap");
+        assertEquals(4, events.size());
+        return events;
+    }
+
+    @Test
+    public void testHandle_HTTP_SimpleRequestResponse() throws Exception {
+        List<Event> events = readDefaultEvents();
+        FlowWorker worker = createWorker();
+        worker.start();
+        SessionEvent sessionEvent = (SessionEvent) events.get(0);
+        worker.handle(new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED));
+        events.forEach(worker::handle);
+        join(worker);
+
+        assertHttpClientEvents(events, listener.events());
+    }
+
+    @Test
+    public void testHandle_HTTP_SimpleRequestResponse_ConnectPartialSession() throws Exception {
+        List<Event> events = readDefaultEvents();
+        FlowWorker worker = createWorker();
+        worker.setConnectPartialSession(true);
+        worker.start();
+        events.forEach(worker::handle);
+        join(worker);
+        assertHttpClientEvents(events, listener.events());
+    }
+
+    @Test
+    public void testHandle_EmitterException_SendErrorsNotReached() throws Exception {
+        List<Event> events = readDefaultEvents();
+        LocalEmitter emitter = new LocalEmitter((object, out) -> {
+            throw new RuntimeException("Test exception");
+        });
+
+        FlowWorker worker = createWorker(emitter);
+        worker.setConnectPartialSession(true);
+        worker.start();
+        events.forEach(worker::handle);
+        join(worker);
+    }
+}
--- a/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java	Mon Apr 06 14:13:39 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java	Tue Apr 07 13:34:22 2020 +0200
@@ -60,7 +60,6 @@
                 })
         ).when(mockEmitter).connect(any(SessionInfo.class), any(EmitterHandler.class), anyInt());
         worker.connect(session);
-        FlowContext flowContext = worker.flowContext(session);
         return channelContext;
     }
 
--- a/stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java	Mon Apr 06 14:13:39 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java	Tue Apr 07 13:34:22 2020 +0200
@@ -1,65 +1,20 @@
 package com.passus.st.client;
 
-import com.passus.st.AbstractWireMockTest;
-import com.passus.st.emitter.RuleBasedSessionMapper;
-import com.passus.st.emitter.nio.NioEmitter;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import static com.github.tomakehurst.wiremock.client.WireMock.*;
-
-public class ParallelFlowWorkerTest extends AbstractWireMockTest {
+import com.passus.st.client.ParallelFlowWorker.FlowThread;
+import com.passus.st.emitter.Emitter;
 
-    private NioEmitter prepareEmitter(String mapperRule) throws Exception {
-        RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper();
-        sessionMapper.addRule(mapperRule);
+public class ParallelFlowWorkerTest extends AbstractFlowWorkerTest {
 
-        NioEmitter emitter = new NioEmitter();
-        emitter.setSessionMapper(sessionMapper);
-        return emitter;
-    }
-
-    @BeforeMethod
-    public void beforeMethod() {
-        String content = "test";
-        stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html"))
-                .willReturn(aResponse()
-                        .withHeader("Content-Type", "text/plain")
-                        .withHeader("Content-Length", "" + content.length())
-                        .withBody(content)));
+    @Override
+    protected FlowWorker createWorker(Emitter emitter) {
+        ParallelFlowWorker worker = new ParallelFlowWorker(emitter, "test", 0);
+        worker.setListener(listener);
+        return worker;
     }
 
-    @Test(enabled = false)
-    public void testHandle() throws Exception {
-       /* Map<String, Object> props = new HashMap<>();
-        props.put("allowPartialSession", true);
-        props.put("ports", 4214);
-        List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props);
-        assertEquals(4, events.size());
-
-        NioEmitter emitter = prepareEmitter("10.87.110.40:4214 ->  " + HOST + ":" + port());
-        emitter.start();
-
-        TestHttpClientListener listner = new TestHttpClientListener();
-
-        ParallelFlowWorker worker = new ParallelFlowWorker(emitter, "test", 0);
-        try {
-            worker.setListener(listner);
-            worker.start();
-
-            events.forEach(worker::handle);
-
-            worker.join(2_000);
-            assertTrue(listner.size() > 0);
-            assertTrue(listner.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent);
-            TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listner.get(0);
-            String responseStr = event.getResponse().toString();
-            assertTrue(responseStr.startsWith("HTTP/1.1 200 OK"));
-            assertTrue(responseStr.endsWith("test"));
-        } finally {
-            ServiceUtils.stopQuietly(emitter);
-        }*/
-
+    protected SessionPayloadEvent extractSentEvent(LocalChannelContext channelContext) {
+        FlowProcessor flowProcessor = (FlowProcessor) channelContext.handler;
+        FlowThread flowThread = (FlowThread) flowProcessor.getSupervisor();
+        return flowThread.flowContext().sentEvent();
     }
-
 }
\ No newline at end of file
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Mon Apr 06 14:13:39 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Tue Apr 07 13:34:22 2020 +0200
@@ -1,326 +1,24 @@
 package com.passus.st.client;
 
-import com.passus.config.Configuration;
-import com.passus.config.ConfigurationContext;
-import com.passus.data.ByteBuff;
-import com.passus.data.DataEncoder;
-import com.passus.data.HeapByteBuff;
-import com.passus.net.SocketAddress;
-import com.passus.net.http.HttpRequest;
-import com.passus.net.http.HttpResponse;
-import com.passus.net.http.HttpResponseEncoder;
-import com.passus.st.Log4jConfigurationFactory;
-import com.passus.st.client.SynchFlowWorker.SynchWrapper;
-import com.passus.st.emitter.*;
-import com.passus.st.metric.MetricsContainer;
-import com.passus.st.utils.EventUtils;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static com.passus.st.utils.Assert.assertHttpClientEvents;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertFalse;
-
-public class SynchFlowWorkerTest {
-    
-    public static final long JOIN_TIMEOUT = Long.MAX_VALUE;
-
-    private final TestHttpClientListener listener = new TestHttpClientListener();
-
-    private static class LocalEmitter implements Emitter {
-
-        private SessionMapper sessionMapper;
-
-        private boolean started = false;
-
-        private final DataEncoder encoder;
-
-        private final ExecutorService executor = Executors.newSingleThreadExecutor();
-
-        public LocalEmitter() {
-            this(HttpResponseEncoder.INSTANCE);
-        }
-
-        public LocalEmitter(DataEncoder encoder) {
-            this.encoder = encoder;
-        }
-
-        @Override
-        public void setSessionMapper(SessionMapper sessionMapper) {
-            this.sessionMapper = sessionMapper;
-        }
-
-        @Override
-        public SessionMapper getSessionMapper() {
-            return sessionMapper;
-        }
-
-        @Override
-        public void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException {
-            LocalChannelContext channelContext = new LocalChannelContext(this, handler, null, session);
-            try {
-                handler.channelRegistered(channelContext);
-                handler.channelActive(channelContext);
-            } catch (Exception ex) {
-                ex.printStackTrace();
-            }
-        }
-
-        protected void flush(LocalChannelContext channelContext) {
-            SessionInfo sessionInfo = channelContext.getSessionInfo();
-            FlowProcessor flowProcessor = (FlowProcessor) channelContext.handler;
-            SynchWrapper wrapper = (SynchWrapper) flowProcessor.getSupervisor();
-            SynchFlowWorker clientWorker = (SynchFlowWorker) wrapper.getFlowWorker();
-
-            FlowContext flowContext = clientWorker.flowContext(sessionInfo);
-            SessionPayloadEvent event = flowContext.sentEvent();
-
-            HttpRequest request = (HttpRequest) event.getRequest();
-            HttpResponse response = (HttpResponse) event.getResponse();
-            ByteBuff buff = new HeapByteBuff();
-            encoder.encode(response, buff);
-
-            executor.execute(() -> {
-                try {
-                    flowProcessor.dataReceived(channelContext, buff);
-                } catch (Exception ex) {
-                    ex.printStackTrace();
-                }
-            });
-        }
-
-        protected void close(LocalChannelContext channelContext) {
-            try {
-                channelContext.handler.channelInactive(channelContext);
-                channelContext.handler.channelUnregistered(channelContext);
-            } catch (Exception ex) {
-                ex.printStackTrace();
-            }
-        }
-
-        @Override
-        public boolean isStarted() {
-            return started;
-        }
-
-        @Override
-        public void start() {
-            if (sessionMapper == null) {
-                sessionMapper = new PassThroughSessionMapper();
-            }
-
-            started = true;
-        }
+import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.SessionInfo;
 
-        @Override
-        public void stop() {
-            started = false;
-        }
-
-        @Override
-        public boolean isCollectMetrics() {
-            return false;
-        }
-
-        @Override
-        public void setCollectMetrics(boolean collectMetrics) {
-        }
-
-        @Override
-        public void writeMetrics(MetricsContainer container) {
-        }
-
-        @Override
-        public void configure(Configuration config, ConfigurationContext context) {
-        }
-    }
-
-    private static class LocalChannelContext implements ChannelContext {
-
-        private final LocalEmitter emitter;
-
-        private final EmitterHandler handler;
-
-        private final SessionInfo sessionInfo;
-
-        private final Queue<ByteBuffer> dataQueue;
-
-        private SocketAddress localAddress;
-
-        private SocketAddress remoteAddress;
-
-        protected boolean bidirectional = true;
-
-        private FlowContext flowContext;
-
-        public LocalChannelContext(LocalEmitter emitter, EmitterHandler handler, SocketAddress remoteAddress, SessionInfo sessionInfo) {
-            this.emitter = emitter;
-            this.handler = handler;
-            this.remoteAddress = remoteAddress;
-            this.sessionInfo = sessionInfo;
-            this.dataQueue = new LinkedList<>();
-        }
-
-        @Override
-        public boolean isBidirectional() {
-            return bidirectional;
-        }
-
-        @Override
-        public void setBidirectional(boolean unidirectional) {
-            this.bidirectional = unidirectional;
-        }
+public class SynchFlowWorkerTest extends AbstractFlowWorkerTest {
 
-        @Override
-        public boolean isConnected() {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-
-        @Override
-        public boolean isConnectionPending() {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-
-        private void addToQeueu(ByteBuffer buffer) throws IOException {
-            dataQueue.add(buffer);
-        }
-
-        @Override
-        public void write(byte[] data, int offset, int length) throws IOException {
-            addToQeueu(ByteBuffer.wrap(data, offset, length));
-        }
-
-        @Override
-        public void write(ByteBuff data) throws IOException {
-            addToQeueu(data.toNioByteBuffer());
-        }
-
-        @Override
-        public void flush() throws IOException {
-            emitter.flush(this);
-        }
-
-        @Override
-        public void close() throws IOException {
-            emitter.close(this);
-        }
-
-        @Override
-        public SocketAddress getLocalAddress() {
-            return localAddress;
-        }
-
-        @Override
-        public SocketAddress getRemoteAddress() {
-            return remoteAddress;
-        }
-
-        @Override
-        public SessionInfo getSessionInfo() {
-            return sessionInfo;
-        }
-
-        @Override
-        public FlowContext getFlowContext() {
-            return flowContext;
-        }
-
-        @Override
-        public void setFlowContext(FlowContext flowContext) {
-            this.flowContext = flowContext;
-        }
-    }
-
-    private SynchFlowWorker createWorker() {
-        LocalEmitter emitter = new LocalEmitter();
+    @Override
+    protected FlowWorker createWorker(Emitter emitter) {
         SynchFlowWorker worker = new SynchFlowWorker(emitter, "test", 0);
         worker.setListener(listener);
         return worker;
     }
 
-    private List<Event> readEvents(String pcapFile) throws Exception {
-        Map<String, Object> props = new HashMap<>();
-        props.put("allowPartialSession", true);
-        props.put("ports", 4214);
-        return EventUtils.readEvents(pcapFile, props);
-    }
-
-    @AfterMethod
-    public void afterMethod() {
-        listener.clear();
-    }
-
-    private void join(SynchFlowWorker worker) {
-        try {
-            worker.join(JOIN_TIMEOUT);
-        } catch (InterruptedException ignore) {
-
-        }
-
-        assertFalse("Worker is still working.", worker.isWorking());
-    }
-
-    private List<Event> readDefaultEvents() throws Exception {
-        List<Event> events = readEvents("pcap/http/http_req_resp.pcap");
-        assertEquals(4, events.size());
-        return events;
-    }
-
-    @Test
-    public void testConnectSuccess() {
-
+    protected SessionPayloadEvent extractSentEvent(LocalChannelContext channelContext) {
+        SessionInfo sessionInfo = channelContext.getSessionInfo();
+        FlowProcessor flowProcessor = (FlowProcessor) channelContext.handler;
+        SynchFlowWorker.SynchWrapper wrapper = (SynchFlowWorker.SynchWrapper) flowProcessor.getSupervisor();
+        SynchFlowWorker clientWorker = (SynchFlowWorker) wrapper.getFlowWorker();
+        FlowContext flowContext = clientWorker.flowContext(sessionInfo);
+        return flowContext.sentEvent();
     }
 
-    @Test
-    public void testHandle_HTTP_SimpleRequestResponse() throws Exception {
-        List<Event> events = readDefaultEvents();
-        SynchFlowWorker worker = createWorker();
-        worker.start();
-        SessionEvent sessionEvent = (SessionEvent) events.get(0);
-        worker.handle(new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED));
-        events.forEach(worker::handle);
-        join(worker);
-
-        assertHttpClientEvents(events, listener.events());
-    }
-
-    @Test
-    public void testHandle_HTTP_SimpleRequestResponse_ConnectPartialSession() throws Exception {
-        List<Event> events = readDefaultEvents();
-        SynchFlowWorker worker = createWorker();
-        worker.setConnectPartialSession(true);
-        worker.start();
-        events.forEach(worker::handle);
-        join(worker);
-        assertHttpClientEvents(events, listener.events());
-    }
-
-    @Test
-    public void testHandle_EmitterException_SendErrorsNotReached() throws Exception {
-        List<Event> events = readDefaultEvents();
-        LocalEmitter emitter = new LocalEmitter((object, out) -> {
-            throw new RuntimeException("Test exception");
-        });
-        SynchFlowWorker worker = new SynchFlowWorker(emitter, "test", 0);
-        worker.setListener(listener);
-        worker.setConnectPartialSession(true);
-        worker.start();
-        events.forEach(worker::handle);
-        join(worker);
-    }
-
-    @Test(enabled = false)
-    public void testHandle_EncoderException() throws Exception {
-        List<Event> events = readDefaultEvents();
-        SynchFlowWorker worker = createWorker();
-        /*worker.setClientFactory(protocolId -> {
-
-        });*/
-    }
 }
\ No newline at end of file