changeset 1033:386815ce52ee

FlowWorkerBase refactorization in progress
author Devel 2
date Fri, 03 Apr 2020 15:08:47 +0200
parents d136672f267c
children 51ef66fadb5c
files stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/FlowContext.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioSocketChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramSocketChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketChannelContext.java stress-tester/src/test/java/com/passus/st/client/AsynchFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java
diffstat 18 files changed, 212 insertions(+), 811 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java	Thu Apr 02 15:34:59 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,543 +0,0 @@
-package com.passus.st.client;
-
-import com.passus.commons.Assert;
-import com.passus.commons.annotations.Plugin;
-import com.passus.net.http.HttpRequest;
-import com.passus.net.http.HttpResponse;
-import com.passus.st.emitter.Emitter;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.plugin.PluginConstants;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-@Plugin(name = AsynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER)
-public class AsynchFlowWorker extends FlowWorkerBase {
-
-    public static final String TYPE = "asynch";
-
-    private long waitTimeout = 100;
-
-    private long windowPeriod = 10;
-
-    private final Queue<TasksTimeWindow> windows = new ConcurrentLinkedQueue<>();
-
-    private TasksTimeWindow currentWindow;
-
-    private volatile boolean processWindow = false;
-
-    private volatile boolean flowStateChanged = false;
-
-    private final Object readerLock = new Object();
-
-    private boolean responseSynch = false;
-
-    public AsynchFlowWorker(Emitter emitter, String name, int index) {
-        super(emitter, name, index);
-
-    }
-
-    public long getWindowPeriod() {
-        return windowPeriod;
-    }
-
-    public void setWindowPeriod(long windowPeriod) {
-        Assert.greaterThanZero(windowPeriod, "tickTime");
-        this.windowPeriod = windowPeriod;
-    }
-
-    public long getWaitTimeout() {
-        return waitTimeout;
-    }
-
-    public void setWaitTimeout(long waitTimeout) {
-        Assert.greaterThanZero(waitTimeout, "eventsQueueWaitTime");
-        this.waitTimeout = waitTimeout;
-    }
-
-    public boolean isResponseSynch() {
-        return responseSynch;
-    }
-
-    public void setResponseSynch(boolean responseSynch) {
-        this.responseSynch = responseSynch;
-    }
-
-    private void waitQuietly() {
-        try {
-            lock.wait(waitTimeout);
-        } catch (InterruptedException ignore) {
-        }
-    }
-
-    @Override
-    protected void closeAllConnections() {
-        synchronized (lock) {
-            boolean wait;
-            do {
-                wait = false;
-                for (FlowContext flowContext : sessions.values()) {
-                    if (flowContext.isEventSent()) {
-                        wait = true;
-                        break;
-                    }
-                }
-
-                if (wait) {
-                    try {
-                        lock.wait(100);
-                    } catch (Exception e) {
-                    }
-                }
-            } while (wait);
-
-            super.closeAllConnections();
-            while (!sessions.isEmpty()) {
-                try {
-                    lock.wait(100);
-                } catch (Exception e) {
-                }
-            }
-        }
-    }
-
-    private void removeWindow(TasksTimeWindow window) {
-        if (window != null) {
-            windows.remove(window);
-        }
-    }
-
-    private TasksTimeWindow createWindow(long time) {
-        TasksTimeWindow window = new TasksTimeWindow(time, windowPeriod);
-        windows.add(window);
-        return window;
-    }
-
-    private TasksTimeWindow getWindow(long time, boolean create) {
-        if (windows.isEmpty()) {
-            if (create) {
-                return createWindow(time);
-            }
-
-            return null;
-        }
-
-        TasksTimeWindow firstWindow = windows.peek();
-        if (firstWindow.inTimeRange(time)) {
-            return firstWindow;
-        } else if (windows.size() > 1) {
-            Iterator<TasksTimeWindow> it = windows.iterator();
-            it.next();
-
-            while (it.hasNext()) {
-                TasksTimeWindow window = it.next();
-                if (window.inTimeRange(time)) {
-                    return window;
-                }
-            }
-        }
-
-        if (create) {
-            return createWindow(time);
-        }
-
-        return null;
-    }
-
-    @Override
-    protected void flowStateChanged(FlowContext context, int oldState) {
-        synchronized (lock) {
-            flowStateChanged = true;
-            lock.notifyAll();
-        }
-    }
-
-    private void addEvent(Event event, TasksTimeWindow window) {
-        switch (event.getType()) {
-            case SessionPayloadEvent.TYPE: {
-                Event newEvent = eventInstanceForWorker(event);
-                long time = newEvent.getTimestamp();
-                SessionPayloadEvent payloadEvent = (SessionPayloadEvent) newEvent;
-                SessionInfo session = payloadEvent.getSessionInfo();
-
-                SessionEventsTask task = window.getSessionEventsTask(session, true);
-                task.events.add(payloadEvent);
-
-                if (responseSynch) {
-                    HttpResponse resp = (HttpResponse) payloadEvent.getResponse();
-                    HttpRequest req = (HttpRequest) payloadEvent.getRequest();
-                    long respTime = time + (resp.getTimestamp() - req.getTimestamp());
-                    HttpResponseEvent respEvent = new HttpResponseEvent(session, event.getSourceName(), resp, respTime);
-
-                    task.events.add(respEvent);
-                }
-
-                break;
-            }
-            case SessionStatusEvent.TYPE: {
-                Event newEvent = eventInstanceForWorker(event);
-                SessionStatusEvent statusEvent = (SessionStatusEvent) newEvent;
-                SessionEventsTask task = window.getSessionEventsTask(statusEvent.getSessionInfo(), true);
-                task.events.add(statusEvent);
-                break;
-            }
-            case DataEvents.DataLoopEnd.TYPE:
-                window.add(DataLoopEndTask.INSTANCE);
-                processWindow = true;
-                break;
-            case DataEvents.DataEnd.TYPE:
-                window.add(DataEndTask.INSTANCE);
-                processWindow = true;
-                break;
-        }
-    }
-
-    @Override
-    public void handle(Event event) {
-        synchronized (readerLock) {
-            while (processWindow) {
-                try {
-                    readerLock.wait();
-                } catch (InterruptedException ignore) {
-
-                }
-            }
-        }
-
-        synchronized (lock) {
-            TasksTimeWindow window = getWindow(event.getTimestamp(), true);
-            if (currentWindow == null) {
-                currentWindow = window;
-            } else if (window != currentWindow) {
-                currentWindow = windows.peek();
-                processWindow = true;
-            }
-
-            addEvent(event, window);
-            if (processWindow) {
-                lock.notifyAll();
-            }
-        }
-
-    }
-
-    private boolean processSessionEvent(SessionEvent event) {
-        switch (event.getType()) {
-            case SessionStatusEvent.TYPE: {
-                SessionStatusEvent statusEvent = (SessionStatusEvent) event;
-                if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
-                    connect(statusEvent);
-                } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
-                    FlowContext flowContext = flowContext(statusEvent);
-                    if (flowContext != null) {
-                        if (!flowContext.isEventSent()) {
-                            disconnect(statusEvent);
-                        }
-                    }
-                }
-
-                return true;
-            }
-            case SessionPayloadEvent.TYPE: {
-                SessionEvent sessEvent = event;
-                FlowContext flowContext = flowContext(sessEvent);
-                if (flowContext != null) {
-                    switch (flowContext.state()) {
-                        case FlowContext.STATE_CONNECTING:
-                            return false;
-                        case FlowContext.STATE_CONNECTED:
-                            if (flowContext.isEventSent()) {
-                                return false;
-                            } else {
-                                send(flowContext, (SessionPayloadEvent) event, true);
-                                return true;
-                            }
-                        case FlowContext.STATE_DISCONNECTING:
-                        case FlowContext.STATE_DISCONNECTED:
-                            if (connectPartialSession) {
-                                connect(sessEvent);
-                            } else {
-                                return true;
-                            }
-                            break;
-                    }
-                } else if (connectPartialSession) {
-                    connect(sessEvent);
-                }
-
-                return true;
-            }
-            case HttpResponseEvent.TYPE: {
-                SessionEvent sessEvent = event;
-                FlowContext flowContext = flowContext(sessEvent);
-                if (flowContext != null) {
-                    return (flowContext.state() >= FlowContext.STATE_DISCONNECTING);
-                }
-
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    @Override
-    public void run() {
-        synchronized (lock) {
-            working = true;
-            while (working) {
-                try {
-                    lock.wait();
-
-                    boolean dataLoopEnd = false;
-                    boolean dataEnd = false;
-
-                    for (; ; ) {
-                        Iterator<Task> it = currentWindow.tasks.iterator();
-                        while (it.hasNext()) {
-                            Task task = it.next();
-                            switch (task.type()) {
-                                case SessionEventsTask.TYPE:
-                                    SessionEventsTask sessionTask = (SessionEventsTask) task;
-                                    if (isBlockedSession(sessionTask.session)) {
-                                        sessionTask.events.clear();
-                                        it.remove();
-                                        return;
-                                    }
-
-                                    if (!sessionTask.events.isEmpty()) {
-                                        Event event = sessionTask.events.get(0);
-                                        if (processSessionEvent((SessionEvent) event)) {
-                                            sessionTask.events.remove(0);
-                                        }
-                                    }
-
-                                    if (sessionTask.events.isEmpty()) {
-                                        it.remove();
-                                    }
-
-                                    break;
-                                case DataLoopEndTask.TYPE:
-                                    dataLoopEnd = true;
-                                    it.remove();
-                                    break;
-                                case DataEndTask.TYPE:
-                                    dataEnd = true;
-                                    it.remove();
-                                    break;
-                            }
-                        }
-
-                        if (currentWindow.tasks.isEmpty()) {
-                            break;
-                        } else if (!flowStateChanged) {
-                            waitQuietly();
-                        } else {
-                            flowStateChanged = false;
-                        }
-                    }
-
-                    if (dataLoopEnd) {
-                        closeAllConnections();
-                    }
-
-                    if (dataEnd) {
-                        working = false;
-                    }
-                } catch (Exception e) {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug(e.getMessage(), e);
-                    }
-                } finally {
-                    removeWindow(currentWindow);
-                    processWindow = false;
-                    synchronized (readerLock) {
-                        readerLock.notifyAll();
-                    }
-                }
-            }
-        }
-    }
-
-    private static final class HttpResponseEvent extends SessionEvent {
-
-        public static final int TYPE = 1012;
-
-        private final HttpResponse payload;
-
-        public HttpResponseEvent(SessionInfo sessionInfo, String sourceName, HttpResponse payload, long timestamp) {
-            super(sessionInfo, sourceName);
-            this.payload = payload;
-            setTimestamp(timestamp);
-        }
-
-        public HttpResponse getPayload() {
-            return payload;
-        }
-
-        @Override
-        public SessionEvent instanceForWorker(int index) {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-
-        @Override
-        public int getType() {
-            return TYPE;
-        }
-
-    }
-
-    private static interface Task {
-
-        public int type();
-
-    }
-
-    private static final class SessionEventsTask implements Task {
-
-        public static final int TYPE = 1;
-
-        private final SessionInfo session;
-
-        private final List<SessionEvent> events = new LinkedList<>();
-
-        public SessionEventsTask(SessionInfo session) {
-            this.session = session;
-        }
-
-        @Override
-        public int type() {
-            return TYPE;
-        }
-
-    }
-
-    private static final class DataLoopEndTask implements Task {
-
-        public static final DataLoopEndTask INSTANCE = new DataLoopEndTask();
-
-        public static final int TYPE = 2;
-
-        @Override
-        public int type() {
-            return TYPE;
-        }
-
-    }
-
-    private static final class DataEndTask implements Task {
-
-        public static final DataEndTask INSTANCE = new DataEndTask();
-
-        public static final int TYPE = 3;
-
-        @Override
-        public int type() {
-            return TYPE;
-        }
-
-    }
-
-    private static final class TasksTimeWindow {
-
-        private final long startTime;
-
-        private final long endTime;
-
-        private final List<Task> tasks = new LinkedList<>();
-
-        private TasksTimeWindow(long time, long period) {
-            int factor = (int) (time / period);
-            startTime = factor * period;
-            endTime = (factor + 1) * period;
-        }
-
-        public long startTime() {
-            return startTime;
-        }
-
-        public long endTime() {
-            return endTime;
-        }
-
-        public boolean inTimeRange(long time) {
-            return (startTime <= time && time < endTime);
-        }
-
-        public void add(Task value) {
-            tasks.add(value);
-        }
-
-        private SessionEventsTask createSessionEventsTask(SessionInfo session) {
-            SessionEventsTask task = new SessionEventsTask(session);
-            tasks.add(task);
-            return task;
-        }
-
-        public SessionEventsTask getSessionEventsTask(SessionInfo session, boolean create) {
-            if (tasks.isEmpty()) {
-                if (create) {
-                    return createSessionEventsTask(session);
-                }
-
-                return null;
-            }
-
-            for (Task task : tasks) {
-                if (task.type() == SessionEventsTask.TYPE) {
-                    SessionEventsTask sessionTask = (SessionEventsTask) task;
-                    if (sessionTask.session.equals(session)) {
-                        return sessionTask;
-                    }
-                }
-            }
-
-            if (create) {
-                return createSessionEventsTask(session);
-            }
-
-            return null;
-        }
-
-        public List<Task> values() {
-            return tasks;
-        }
-
-        public long getPeroid() {
-            return (endTime - startTime);
-        }
-
-        @Override
-        public String toString() {
-            return "TimeWindow{"
-                    + "startTimestamp=" + startTime
-                    + ", endTimestamp=" + endTime + '}';
-        }
-
-        @Override
-        public int hashCode() {
-            int hash = 5;
-            hash = 17 * hash + (int) (this.startTime ^ (this.startTime >>> 32));
-            hash = 17 * hash + (int) (this.endTime ^ (this.endTime >>> 32));
-            return hash;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj) {
-                return true;
-            } else if (!(obj instanceof TasksTimeWindow)) {
-                return false;
-            }
-
-            final TasksTimeWindow other = (TasksTimeWindow) obj;
-            return this.startTime == other.startTime
-                    && this.endTime == other.endTime;
-        }
-
-    }
-
-}
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Thu Apr 02 15:34:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Fri Apr 03 15:08:47 2020 +0200
@@ -36,16 +36,18 @@
 
     protected SessionPayloadEvent sentEvent;
 
-    protected byte state = STATE_CONNECTING;
+    protected byte state = STATE_NEW;
 
     protected long connectionTime = -1;
 
     protected long timeout = -1;
 
+    protected long writeStartTime = -1;
+
+    protected long writeEndTime = -1;
+
     protected long receivedStartTime = -1;
 
-    protected long sendStartTime = -1;
-
     protected int loop;
 
     protected FlowHandler client;
@@ -92,6 +94,11 @@
         lockCond.signal();
     }
 
+    void signalAndUnlock() {
+        signal();
+        unlock();
+    }
+
     public boolean isBidirectional() {
         return bidirectional;
     }
@@ -163,11 +170,11 @@
     }
 
     public long sendStartTimestamp() {
-        return sendStartTime;
+        return writeStartTime;
     }
 
     public void sendStartTimestamp(long sendStartTimestamp) {
-        this.sendStartTime = sendStartTimestamp;
+        this.writeStartTime = sendStartTimestamp;
     }
 
     public SessionPayloadEvent sentEvent() {
@@ -269,6 +276,9 @@
         timeout = -1;
         error = null;
         blocked = false;
+        writeStartTime = -1;
+        writeEndTime = -1;
+        receivedStartTime = -1;
 
         if (params != null) {
             params.clear();
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Thu Apr 02 15:34:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Fri Apr 03 15:08:47 2020 +0200
@@ -29,8 +29,6 @@
 
     private final Timeouts timeouts = new Timeouts();
 
-    protected final Object lock = new Object();
-
     protected volatile boolean working = false;
 
     private long checkTimeoutsPeriod = 5_000;
@@ -61,11 +59,14 @@
     @Override
     public int activeConnections() {
         int count = 0;
-        synchronized (lock) {
-            for (FlowContext flowContext : sessions.values()) {
+        for (FlowContext flowContext : sessions.values()) {
+            flowContext.lock();
+            try {
                 if (flowContext.state() != STATE_DISCONNECTED) {
                     count++;
                 }
+            } finally {
+                flowContext.unlock();
             }
         }
 
@@ -175,22 +176,21 @@
     }
 
     protected FlowContext connect(SessionInfo session) {
-        synchronized (lock) {
-            try {
-                FlowContext flowContext = register(session);
-                if (flowContext != null) {
-                    connect(flowContext, true);
-                }
-            } catch (Exception e) {
-                logger.error(e);
-            }
-            return null;
+        FlowContext flowContext = register(session);
+        if (flowContext != null) {
+            connect(flowContext, true);
         }
+
+        return flowContext;
     }
 
     protected FlowContext registerAndConnect(SessionInfo session, boolean wait) {
         FlowContext flowContext = flowContext(session);
         if (flowContext != null) {
+            if (flowContext.blocked) {
+                return flowContext;
+            }
+
             throw new RuntimeException("Not implemented yet.");
         } else {
             flowContext = register(session);
@@ -208,6 +208,7 @@
         flowContext.lock.lock();
         try {
             flowContext.connectionAttempts++;
+            flowContext.state = STATE_CONNECTING;
             emitter.connect(flowContext.session, this, index);
             if (wait) {
                 waitOpFinished(flowContext, STATE_CONNECTED);
@@ -268,8 +269,12 @@
         }
 
         long now = timeGenerator.currentTimeMillis();
-        flowContext.lock.lock();
+        flowContext.lock();
         try {
+            if (trace) {
+                debug(flowContext, "Disconnecting.");
+            }
+
             if (flowContext.state == STATE_DISCONNECTING
                     || flowContext.state == STATE_DISCONNECTED) {
                 return;
@@ -302,7 +307,17 @@
         } catch (InterruptedException e) {
             error(flowContext, e);
         } finally {
-            flowContext.lock.unlock();
+            flowContext.signalAndUnlock();
+        }
+    }
+
+    protected void disconnectAllConnections() {
+        disconnectAllConnections(true);
+    }
+
+    protected void disconnectAllConnections(boolean wait) {
+        for (FlowContext flowContext : sessions.values()) {
+            disconnect(flowContext, wait);
         }
     }
 
@@ -363,39 +378,6 @@
 
     }
 
-    protected void removeFlowContext(FlowContext flowContext) {
-        synchronized (lock) {
-            debug(flowContext, "removeFlowContext");
-            sessions.remove(flowContext.sessionInfo());
-        }
-    }
-
-    /*protected void reconnect(FlowContext flowContext) {
-        synchronized (lock) {
-            try {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, "Reconnect (state: {}).", stateToString(flowContext.state()));
-                }
-
-                SessionInfo session = flowContext.sessionInfo();
-                updateFlowState(flowContext, FlowContext.STATE_CONNECTING);
-                emitter.connect(session, this, index);
-            } catch (Exception e) {
-                error(flowContext, e.getMessage(), e);
-            }
-        }
-    }*/
-
-    protected void closeAllConnections() {
-        closeAllConnections(true);
-    }
-
-    protected void closeAllConnections(boolean wait) {
-        for (FlowContext flowContext : sessions.values()) {
-            disconnect(flowContext, wait);
-        }
-    }
-
     private void sleepSilently(long millis) {
         if (millis == 0) {
             return;
@@ -440,7 +422,7 @@
             try {
                 context.setBidirectional(flowContext.isBidirectional());
                 flowContext.channelContext(context);
-                context.setAttachment(flowContext);
+                context.setFlowContext(flowContext);
                 flowContext.connectionAttempts = 0;
                 flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY));
                 flowContext.state = STATE_CONNECTED;
@@ -463,7 +445,7 @@
 
     @Override
     public void channelInactive(ChannelContext context) throws Exception {
-        FlowContext flowContext = (FlowContext) context.getAttachment();
+        FlowContext flowContext = (FlowContext) context.getFlowContext();
         if (logger.isDebugEnabled()) {
             debug(flowContext, "Channel inactive.");
         }
@@ -489,24 +471,25 @@
 
     @Override
     public void sessionInvalidated(SessionInfo session) throws Exception {
-        synchronized (lock) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Session {} invalidated.", session);
-            }
+        if (logger.isDebugEnabled()) {
+            logger.debug("Session {} invalidated.", session);
+        }
 
-            FlowContext flowContext = flowContext(session);
-            if (flowContext != null) {
+        FlowContext flowContext = flowContext(session);
+        if (flowContext != null) {
+            flowContext.lock();
+            try {
                 disconnect(flowContext);
+                addBlockedSession(session);
+            } finally {
+                flowContext.signalAndUnlock();
             }
-
-            addBlockedSession(session);
-            lock.notifyAll();
         }
     }
 
     @Override
     public void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
-        FlowContext flowContext = (FlowContext) context.getAttachment();
+        FlowContext flowContext = (FlowContext) context.getFlowContext();
         logger.debug("dataReceived");
         flowContext.lock();
         try {
@@ -573,42 +556,43 @@
                 error(flowContext, FlowError.unknownError());
             }
         } finally {
-            flowContext.signal();
-            flowContext.unlock();
+            flowContext.signalAndUnlock();
         }
     }
 
     @Override
     public void dataWriteStart(ChannelContext context) {
-        synchronized (lock) {
-            FlowContext flowContext = (FlowContext) context.getAttachment();
+        FlowContext flowContext = context.getFlowContext();
+        flowContext.lock();
+        try {
             if (flowContext.sentEvent() != null) {
-                long now = timeGenerator.currentTimeMillis();
-                flowContext.sendStartTimestamp(now);
-                flowContext.client().onDataWriteStart(flowContext);
+                flowContext.writeStartTime = timeGenerator.currentTimeMillis();
+                flowContext.writeEndTime = -1;
+                flowContext.client.onDataWriteStart(flowContext);
             }
+        } finally {
+            flowContext.signalAndUnlock();
         }
     }
 
     @Override
     public void dataWritten(ChannelContext context) throws Exception {
-        synchronized (lock) {
-            FlowContext flowContext = (FlowContext) context.getAttachment();
+        FlowContext flowContext = context.getFlowContext();
+        flowContext.lock();
+        try {
             if (flowContext.isEventSent()) {
+                long now = timeGenerator.currentTimeMillis();
                 if (collectMetric) {
-                    long now = timeGenerator.currentTimeMillis();
                     synchronized (metric) {
                         metric.addRequestSendingTime(now - flowContext.sendStartTimestamp());
                     }
                 }
 
-                flowContext.client().onDataWriteEnd(flowContext);
-                if (!flowContext.isBidirectional()) {
-                    responseReceived0(flowContext, null);
-                }
+                flowContext.writeEndTime = now;
+                flowContext.client.onDataWriteEnd(flowContext);
             }
-
-            lock.notifyAll();
+        } finally {
+            flowContext.signalAndUnlock();
         }
     }
 
@@ -618,7 +602,7 @@
             logger.debug("Error occurred. ", cause);
         }
 
-        FlowContext flowContext = (FlowContext) context.getAttachment();
+        FlowContext flowContext = (FlowContext) context.getFlowContext();
         //Jezeli nie nastapilo polaczenie flowContext == null
         if (flowContext == null) {
             flowContext = flowContext(context);
@@ -627,9 +611,13 @@
         flowContext.lock.lock();
         try {
             if (flowContext.state == STATE_CONNECTING) {
-                if (flowContext.connectionAttempts < maxConnectionAttempts) {
-                    //TODO - malo optymalne, blokuje przetwarzanie eventow dla konkretnej sesji.
-                    // Odbije sie nw wydajnosci workera asynch.
+                if (flowContext.connectionAttempts <= maxConnectionAttempts) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Connection failed. Reconnection (attempt {}/{}, delay {}ms).",
+                                flowContext.connectionAttempts, maxConnectionAttempts, reconnectDelay,
+                                cause);
+                    }
+
                     if (reconnectDelay > 0) {
                         try {
                             Thread.sleep(reconnectDelay);
@@ -640,6 +628,10 @@
 
                     connect(flowContext);
                     return;
+                } else {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Connection failed. No reconnection.", flowContext.connectionAttempts, maxConnectionAttempts, cause);
+                    }
                 }
             }
         } finally {
@@ -650,10 +642,8 @@
     }
 
     protected void send(FlowContext flowContext, SessionPayloadEvent event, boolean wait) {
-        logger.debug("send");
         flowContext.lock();
         try {
-            logger.debug("send-after lock");
             Object req = event.getRequest();
             if (req != null) {
                 if (filterChain.filterOutbound(req, event.getResponse(), flowContext) == Filter.DENY) {
@@ -688,12 +678,18 @@
 
                 try {
                     flowContext.sentEvent = event;
+                    flowContext.writeStartTime = -1;
+                    flowContext.writeEndTime = -1;
                     flowContext.channelContext().writeAndFlush(buffer);
                     requestSent0(flowContext, event);
                     buffer.clear();
 
-                    if (wait && flowContext.isBidirectional()) {
-                        waitForResponse(flowContext);
+                    if (wait) {
+                        if (flowContext.isBidirectional()) {
+                            waitForResponse(flowContext);
+                        } else {
+                            waitForWriteEnd(flowContext);
+                        }
                     }
                 } catch (Exception e) {
                     flowContext.sendErrors++;
@@ -713,6 +709,23 @@
         }
     }
 
+    protected boolean waitForWriteEnd(FlowContext flowContext) throws InterruptedException {
+        return waitForWriteEnd(flowContext, timeouts.getDefaultTimeout());
+    }
+
+    protected boolean waitForWriteEnd(FlowContext flowContext, long timeout) throws InterruptedException {
+        long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeout);
+        while (flowContext.writeEndTime == -1 && !flowContext.isError()) {
+            if (timeNanos <= 0) {
+                return false;
+            }
+
+            timeNanos = flowContext.lockCond.awaitNanos(timeNanos);
+        }
+
+        return true;
+    }
+
     protected boolean waitForResponse(FlowContext flowContext) throws InterruptedException {
         return waitForResponse(flowContext, timeouts.getDefaultTimeout());
     }
@@ -783,34 +796,32 @@
     }
 
     protected void processTimeouts() {
-        synchronized (lock) {
-            try {
-                long now = timeGenerator.currentTimeMillis();
-                if (nextCheckTimeoutsTime == -1) {
-                    nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
-                } else if (nextCheckTimeoutsTime > now) {
-                    nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
-                    for (FlowContext flowContext : sessions.values()) {
-                        if (flowContext.timeouted(now)) {
-                            if (logger.isDebugEnabled()) {
-                                debug(flowContext, "Flow for session '{}' timed out (state '{}').",
-                                        flowContext.sessionInfo(),
-                                        stateToString(flowContext.state()));
-                            }
+        try {
+            long now = timeGenerator.currentTimeMillis();
+            if (nextCheckTimeoutsTime == -1) {
+                nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
+            } else if (nextCheckTimeoutsTime > now) {
+                nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
+                for (FlowContext flowContext : sessions.values()) {
+                    if (flowContext.timeouted(now)) {
+                        if (logger.isDebugEnabled()) {
+                            debug(flowContext, "Flow for session '{}' timed out (state '{}').",
+                                    flowContext.sessionInfo(),
+                                    stateToString(flowContext.state()));
+                        }
 
-                            switch (flowContext.state()) {
-                                case FlowContext.STATE_CONNECTING:
-                                case FlowContext.STATE_CONNECTED:
-                                    disconnect(flowContext);
-                                    break;
-                            }
+                        switch (flowContext.state()) {
+                            case FlowContext.STATE_CONNECTING:
+                            case FlowContext.STATE_CONNECTED:
+                                disconnect(flowContext);
+                                break;
                         }
                     }
                 }
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug(e.getMessage(), e);
-                }
+            }
+        } catch (Exception e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug(e.getMessage(), e);
             }
         }
     }
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Thu Apr 02 15:34:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Fri Apr 03 15:08:47 2020 +0200
@@ -7,7 +7,10 @@
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.plugin.PluginConstants;
 
-import java.util.*;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Queue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.Semaphore;
 
@@ -67,7 +70,6 @@
         this.eventsQueueWaitTime = eventsQueueWaitTime;
     }
 
-    @Override
     protected void removeFlowContext(FlowContext flowContext) {
         if (flowContext != null) {
             flowIndex.remove(flowContext);
@@ -82,7 +84,7 @@
     }
 
     private void waitCloseAllConnections() {
-        closeAllConnections = true;
+        /*closeAllConnections = true;
         synchronized (lock) {
             while (!flowIndex.isEmpty()) {
                 try {
@@ -92,7 +94,8 @@
             }
         }
 
-        closeAllConnections = false;
+        closeAllConnections = false;*/
+        throw new RuntimeException("Not implemented.");
     }
 
 /*    @Override
@@ -168,10 +171,8 @@
     }
 
     private void makeFirst(LocalFlowContext flowContext) {
-        synchronized (lock) {
-            flowIndex.remove(flowContext);
-            flowIndex.addFirst(flowContext);
-        }
+        flowIndex.remove(flowContext);
+        flowIndex.addFirst(flowContext);
     }
 
     private void addToQueue(LocalFlowContext flowContext, Event event) {
@@ -194,13 +195,10 @@
         }
 
         if (newEvent != null) {
-            synchronized (lock) {
-                try {
-                    eventsQueue.put(newEvent);
-                    lock.notifyAll();
-                } catch (Exception e) {
-                    logger.debug("Unable to add event to queue. " + e.getMessage(), e);
-                }
+            try {
+                eventsQueue.put(newEvent);
+            } catch (Exception e) {
+                logger.debug("Unable to add event to queue. " + e.getMessage(), e);
             }
         }
     }
@@ -271,23 +269,16 @@
 
     @Override
     public void run() {
-        synchronized (lock) {
-            working = true;
-            while (working) {
-                try {
-                    try {
-                        lock.wait(eventsQueueWaitTime);
-                    } catch (InterruptedException ignore) {
-                    }
-
-                    Event event;
-                    while ((event = eventsQueue.poll()) != null) {
-                        processEvent(event);
-                    }
-                } catch (Exception e) {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug(e.getMessage(), e);
-                    }
+        working = true;
+        while (working) {
+            try {
+                Event event;
+                while ((event = eventsQueue.poll()) != null) {
+                    processEvent(event);
+                }
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug(e.getMessage(), e);
                 }
             }
         }
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Thu Apr 02 15:34:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Fri Apr 03 15:08:47 2020 +0200
@@ -75,7 +75,6 @@
                 return;
             } else if (event.getType() == SessionPayloadEvent.TYPE) {
                 FlowContext flowContext = flowContext(sessEvent);
-
                 if (flowContext != null) {
                     if (flowContext.blocked) {
                         return;
@@ -99,7 +98,7 @@
                 logger.debug("DataLoopEnd received.");
             }
 
-            closeAllConnections();
+            disconnectAllConnections();
             filterChain.reset();
         } else if (event.getType() == DataEvents.DataEnd.TYPE) {
             if (logger.isDebugEnabled()) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java	Thu Apr 02 15:34:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java	Fri Apr 03 15:08:47 2020 +0200
@@ -2,13 +2,14 @@
 
 import com.passus.data.ByteBuff;
 import com.passus.net.SocketAddress;
+import com.passus.st.client.FlowContext;
 
 import java.io.IOException;
 
 /**
  * @author Mirosław Hawrot
  */
-public interface ChannelContext<T> {
+public interface ChannelContext {
 
     boolean isBidirectional();
 
@@ -42,8 +43,8 @@
 
     SessionInfo getSessionInfo();
 
-    void setAttachment(T attachment);
+    void setFlowContext(FlowContext attachment);
 
-    T getAttachment();
+    FlowContext getFlowContext();
 
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java	Thu Apr 02 15:34:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java	Fri Apr 03 15:08:47 2020 +0200
@@ -2,6 +2,7 @@
 
 import com.passus.data.ByteBuff;
 import com.passus.net.SocketAddress;
+import com.passus.st.client.FlowContext;
 import com.passus.st.emitter.ChannelContext;
 import com.passus.st.emitter.SessionInfo;
 
@@ -11,7 +12,7 @@
 import java.util.LinkedList;
 import java.util.Queue;
 
-public abstract class NioChannelContext<T, K> implements ChannelContext<K> {
+public abstract class NioChannelContext<T> implements ChannelContext {
 
     protected final NioEmitterWorker worker;
 
@@ -29,7 +30,7 @@
 
     protected SelectionKey key;
 
-    private K attachment;
+    private FlowContext flowContext;
 
     public NioChannelContext(NioEmitterWorker worker, T channel, SocketAddress remoteAddress, SessionInfo sessionInfo) {
         this.worker = worker;
@@ -94,12 +95,12 @@
     }
 
     @Override
-    public K getAttachment() {
-        return attachment;
+    public FlowContext getFlowContext() {
+        return flowContext;
     }
 
     @Override
-    public void setAttachment(K attachment) {
-        this.attachment = attachment;
+    public void setFlowContext(FlowContext attachment) {
+        this.flowContext = attachment;
     }
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java	Thu Apr 02 15:34:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java	Fri Apr 03 15:08:47 2020 +0200
@@ -3,6 +3,7 @@
 import com.passus.data.ByteBuff;
 import com.passus.net.SocketAddress;
 import com.passus.net.utils.AddressUtils;
+import com.passus.st.client.FlowContext;
 import com.passus.st.emitter.ChannelContext;
 import com.passus.st.emitter.SessionInfo;
 
@@ -16,7 +17,7 @@
 /**
  * @author Mirosław Hawrot
  */
-public class NioChannelContext2<K> implements ChannelContext<K> {
+public class NioChannelContext2 implements ChannelContext {
 
     private final NioEmitterWorker2 worker;
 
@@ -34,7 +35,7 @@
 
     private SelectionKey key;
 
-    private K attachment;
+    private FlowContext flowContext;
 
     /**
      * Usunac
@@ -125,12 +126,12 @@
     }
 
     @Override
-    public K getAttachment() {
-        return attachment;
+    public FlowContext getFlowContext() {
+        return flowContext;
     }
 
     @Override
-    public void setAttachment(K attachment) {
-        this.attachment = attachment;
+    public void setFlowContext(FlowContext flowContext) {
+        this.flowContext = flowContext;
     }
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramChannelContext.java	Thu Apr 02 15:34:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramChannelContext.java	Fri Apr 03 15:08:47 2020 +0200
@@ -8,7 +8,7 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.DatagramChannel;
 
-public class NioDatagramChannelContext<T> extends NioChannelContext<DatagramChannel, T> {
+public class NioDatagramChannelContext extends NioChannelContext<DatagramChannel> {
 
     public NioDatagramChannelContext(NioEmitterWorker worker, DatagramChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) {
         super(worker, channel, remoteAddress, sessionInfo);
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioSocketChannelContext.java	Thu Apr 02 15:34:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioSocketChannelContext.java	Fri Apr 03 15:08:47 2020 +0200
@@ -11,7 +11,7 @@
 /**
  * @author Mirosław Hawrot
  */
-public class NioSocketChannelContext<T> extends NioChannelContext<SocketChannel, T> {
+public class NioSocketChannelContext extends NioChannelContext<SocketChannel> {
 
     public NioSocketChannelContext(NioEmitterWorker worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) {
         super(worker, channel, remoteAddress, sessionInfo);
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java	Thu Apr 02 15:34:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java	Fri Apr 03 15:08:47 2020 +0200
@@ -3,7 +3,7 @@
 import com.passus.data.ByteBuff;
 import com.passus.net.MACAddress;
 import com.passus.net.SocketAddress;
-import com.passus.pcap.Pcap;
+import com.passus.st.client.FlowContext;
 import com.passus.st.emitter.ChannelContext;
 import com.passus.st.emitter.EmitterHandler;
 import com.passus.st.emitter.SessionInfo;
@@ -12,7 +12,7 @@
 import java.util.LinkedList;
 import java.util.Queue;
 
-public class UnidirectionalPcapChannelContext<K> implements ChannelContext<K> {
+public class UnidirectionalPcapChannelContext implements ChannelContext {
 
     private static final int DEFAULT_BUFFER_SIZE = 65 * 1024;
 
@@ -24,7 +24,7 @@
 
     final byte[] buffer;
 
-    private K attachment;
+    private FlowContext flowContext;
 
     private final EmitterHandler handler;
 
@@ -99,13 +99,13 @@
     }
 
     @Override
-    public void setAttachment(K attachment) {
-        this.attachment = attachment;
+    public void setFlowContext(FlowContext attachment) {
+        this.flowContext = attachment;
     }
 
     @Override
-    public K getAttachment() {
-        return attachment;
+    public FlowContext getFlowContext() {
+        return flowContext;
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java	Thu Apr 02 15:34:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java	Fri Apr 03 15:08:47 2020 +0200
@@ -2,6 +2,7 @@
 
 import com.passus.data.ByteBuff;
 import com.passus.net.SocketAddress;
+import com.passus.st.client.FlowContext;
 import com.passus.st.emitter.ChannelContext;
 import com.passus.st.emitter.SessionInfo;
 
@@ -12,7 +13,7 @@
 import static com.passus.st.emitter.StatelessTasks.CLOSE_TASK;
 import static com.passus.st.emitter.StatelessTasks.FLUSH_TASK;
 
-public abstract class AbstractChannelContext<K> implements ChannelContext<K> {
+public abstract class AbstractChannelContext implements ChannelContext {
 
     protected final Connection connection;
 
@@ -20,7 +21,7 @@
 
     private final Queue<byte[]> dataQueue;
 
-    private K attachment;
+    private FlowContext flowContext;
 
     public AbstractChannelContext(Connection connection) {
         this.connection = connection;
@@ -77,11 +78,11 @@
     }
 
     @Override
-    public K getAttachment() {
-        return attachment;
+    public FlowContext getFlowContext() {
+        return flowContext;
     }
 
-    public void setAttachment(K attachment) {
-        this.attachment = attachment;
+    public void setFlowContext(FlowContext attachment) {
+        this.flowContext = attachment;
     }
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramSocketChannelContext.java	Thu Apr 02 15:34:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramSocketChannelContext.java	Fri Apr 03 15:08:47 2020 +0200
@@ -1,6 +1,6 @@
 package com.passus.st.emitter.socket;
 
-public class DatagramSocketChannelContext<K> extends AbstractChannelContext<K> {
+public class DatagramSocketChannelContext extends AbstractChannelContext {
 
     public DatagramSocketChannelContext(Connection connection) {
         super(connection);
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketChannelContext.java	Thu Apr 02 15:34:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketChannelContext.java	Fri Apr 03 15:08:47 2020 +0200
@@ -1,6 +1,6 @@
 package com.passus.st.emitter.socket;
 
-public class SocketChannelContext<K> extends AbstractChannelContext<K> {
+public class SocketChannelContext extends AbstractChannelContext {
 
     public SocketChannelContext(Connection connection) {
         super(connection);
--- a/stress-tester/src/test/java/com/passus/st/client/AsynchFlowWorkerTest.java	Thu Apr 02 15:34:59 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,78 +0,0 @@
-package com.passus.st.client;
-
-import com.passus.commons.service.ServiceUtils;
-import com.passus.st.AbstractWireMockTest;
-import com.passus.st.emitter.RuleBasedSessionMapper;
-import com.passus.st.emitter.nio.NioEmitter;
-import com.passus.st.utils.EventUtils;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static com.github.tomakehurst.wiremock.client.WireMock.*;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertTrue;
-
-public class AsynchFlowWorkerTest extends AbstractWireMockTest {
-
-    private NioEmitter prepareEmitter(String mapperRule) throws Exception {
-        RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper();
-        sessionMapper.addRule(mapperRule);
-
-        NioEmitter emitter = new NioEmitter();
-        emitter.setSessionMapper(sessionMapper);
-        return emitter;
-    }
-
-    @BeforeMethod
-    public void beforeMethod() {
-        String content = "test";
-        stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html"))
-                .willReturn(aResponse()
-                        .withHeader("Content-Type", "text/plain")
-                        .withHeader("Content-Length", "" + content.length())
-                        .withBody(content)));
-    }
-
-    @Test(enabled = false)
-    public void testHandle() throws Exception {
-        Map<String, Object> props = new HashMap<>();
-        props.put("allowPartialSession", true);
-        props.put("ports", 4214);
-        List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props);
-        assertEquals(4, events.size());
-
-        NioEmitter emitter = prepareEmitter("10.87.110.40:4214 ->  " + HOST + ":" + port());
-        emitter.start();
-
-        TestHttpClientListener listener = new TestHttpClientListener();
-
-        AsynchFlowWorker worker = new AsynchFlowWorker(emitter, "test", 0);
-        try {
-            worker.setListener(listener);
-            worker.start();
-
-            SessionEvent sessionEvent = (SessionEvent) events.get(0);
-            SessionStatusEvent statusEvent = new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED);
-            statusEvent.setTimestamp(sessionEvent.getTimestamp());
-            worker.handle(statusEvent);
-
-            events.forEach(worker::handle);
-
-            worker.join();
-            assertTrue(listener.size() > 0);
-            assertTrue(listener.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent);
-            TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listener.get(0);
-            String responseStr = event.getResponse().toString();
-            assertTrue(responseStr.startsWith("HTTP/1.1 200 OK"));
-            assertTrue(responseStr.endsWith("test"));
-        } finally {
-            ServiceUtils.stopQuietly(emitter);
-        }
-    }
-
-}
\ No newline at end of file
--- a/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java	Thu Apr 02 15:34:59 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java	Fri Apr 03 15:08:47 2020 +0200
@@ -128,7 +128,7 @@
         TestFlowWorker worker = worker();
         ChannelContext channelContext = makeConnected(worker, session);
         FlowContext flowContext = worker.flowContext(session);
-        when(channelContext.getAttachment()).thenReturn(flowContext);
+        when(channelContext.getFlowContext()).thenReturn(flowContext);
 
         doAnswer((Answer<Void>) invocation -> {
                     worker.channelInactive(channelContext);
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Thu Apr 02 15:34:59 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Fri Apr 03 15:08:47 2020 +0200
@@ -9,7 +9,6 @@
 import com.passus.net.http.HttpRequest;
 import com.passus.net.http.HttpResponse;
 import com.passus.net.http.HttpResponseEncoder;
-import com.passus.st.Log4jConfigurationFactory;
 import com.passus.st.emitter.*;
 import com.passus.st.metric.MetricsContainer;
 import com.passus.st.utils.EventUtils;
@@ -19,7 +18,6 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -154,7 +152,7 @@
 
         protected boolean bidirectional = true;
 
-        private Object attachment;
+        private FlowContext flowContext;
 
         public LocalChannelContext(LocalEmitter emitter, EmitterHandler handler, SocketAddress remoteAddress, SessionInfo sessionInfo) {
             this.emitter = emitter;
@@ -224,13 +222,13 @@
         }
 
         @Override
-        public Object getAttachment() {
-            return attachment;
+        public FlowContext getFlowContext() {
+            return flowContext;
         }
 
         @Override
-        public void setAttachment(Object attachment) {
-            this.attachment = attachment;
+        public void setFlowContext(FlowContext flowContext) {
+            this.flowContext = flowContext;
         }
     }
 
--- a/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java	Thu Apr 02 15:34:59 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java	Fri Apr 03 15:08:47 2020 +0200
@@ -4,15 +4,14 @@
 import com.passus.commons.utils.ArrayUtils;
 import com.passus.net.netflow.Netflow9;
 import com.passus.net.netflow.Netflow9Decoder;
-import com.passus.st.Log4jConfigurationFactory;
 import com.passus.st.Protocols;
 import com.passus.st.client.Event;
 import com.passus.st.client.FlowExecutor;
 import com.passus.st.emitter.RuleBasedSessionMapper;
 import com.passus.st.emitter.nio.NioEmitter;
 import com.passus.st.utils.EventUtils;
+import com.passus.st.utils.server.DefaultServerListener;
 import com.passus.st.utils.server.SimpleDatagramServer;
-import com.passus.st.utils.server.DefaultServerListener;
 import org.testng.annotations.Test;
 
 import java.util.HashMap;
@@ -69,4 +68,14 @@
 
     }
 
+    @Test(enabled = false)
+    public void testHandle_Netflow2() throws Exception {
+        for (int i = 0; i < 1000; i++) {
+            if (i % 10 == 0) {
+                System.out.println(i);
+            }
+
+            testHandle_Netflow();
+        }
+    }
 }