changeset 973:641a1a8bcb12

PcapSessionEventSource - TCP/UDP session processing improvements + bugfixes
author Devel 2
date Wed, 24 Jul 2019 14:19:28 +0200
parents 6fc989064ecf
children e3532e4a84fe
files stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/main/java/com/passus/st/source/BaseSessionAnalyzerListener.java stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSourceMetric.java stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java
diffstat 9 files changed, 635 insertions(+), 728 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java	Tue Jul 23 10:44:29 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java	Wed Jul 24 14:19:28 2019 +0200
@@ -15,7 +15,7 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 @Plugin(name = AsynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER)
-public class AsynchFlowWorker extends FlowWorkerBased {
+public class AsynchFlowWorker extends FlowWorkerBase {
 
     public static final String TYPE = "asynch";
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Wed Jul 24 14:19:28 2019 +0200
@@ -0,0 +1,566 @@
+package com.passus.st.client;
+
+import com.passus.commons.Assert;
+import com.passus.data.ByteBuff;
+import com.passus.data.DataDecoder;
+import com.passus.data.HeapByteBuff;
+import com.passus.filter.Filter;
+import com.passus.st.emitter.ChannelContext;
+import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.metric.MetricsContainer;
+import it.unimi.dsi.fastutil.ints.Int2LongArrayMap;
+import it.unimi.dsi.fastutil.ints.Int2LongMap;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.passus.st.client.FlowContext.*;
+import static com.passus.st.client.FlowUtils.DEFAULT_TIMEOUTS;
+
+public abstract class FlowWorkerBase extends FlowWorker {
+
+    public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f;
+
+    protected final Map<SessionInfo, FlowContext> sessions = new ConcurrentHashMap<>();
+
+    private final Set<SessionInfo> blockedSessions = new HashSet<>();
+
+    private final Int2LongMap timeouts = new Int2LongArrayMap();
+
+    protected final Object lock = new Object();
+
+    protected volatile boolean working = false;
+
+    private long checkTimeoutsPeriod = 5_000;
+
+    private long nextCheckTimeoutsTime = -1;
+
+    private float sleepFactor = SLEEP_FACTOR_NO_SLEEP;
+
+    private long lastEventTimestamp = -1;
+
+    public FlowWorkerBase(Emitter emitter, String name, int index) {
+        super(emitter, name, index);
+        timeouts.putAll(DEFAULT_TIMEOUTS);
+    }
+
+    @Override
+    public boolean isWorking() {
+        return working;
+    }
+
+    @Override
+    public int activeConnections() {
+        int count = 0;
+        synchronized (lock) {
+            for (FlowContext flowContext : sessions.values()) {
+                if (flowContext.state() != STATE_DISCONNECTED) {
+                    count++;
+                }
+            }
+        }
+
+        return count;
+    }
+
+    protected final void addBlockedSession(SessionInfo session) {
+        blockedSessions.add(session);
+    }
+
+    protected final boolean isBlockedSession(SessionInfo session) {
+        return !blockedSessions.isEmpty() && blockedSessions.contains(session);
+    }
+
+    public float getSleepFactor() {
+        return sleepFactor;
+    }
+
+    public void setSleepFactor(float sleepFactor) {
+        Assert.greaterOrEqualZero(sleepFactor, "sleepFactor");
+        this.sleepFactor = sleepFactor;
+    }
+
+    public long getCheckTimeoutsPeriod() {
+        return checkTimeoutsPeriod;
+    }
+
+    public void setCheckTimeoutsPeriod(long checkTimeoutsPeriod) {
+        Assert.greaterThanZero(checkTimeoutsPeriod, "checkTimeoutsPeriod");
+        this.checkTimeoutsPeriod = checkTimeoutsPeriod;
+    }
+
+    protected final void changeFlowState(FlowContext flowContext, int state) {
+        try {
+            if (flowContext.state() == state) {
+                return;
+            }
+
+            int oldState = flowContext.state();
+            if (logger.isDebugEnabled()) {
+                debug(flowContext, "Flow status changing {} -> {}.",
+                        contextStateToString(flowContext.state()),
+                        contextStateToString(state)
+                );
+            }
+
+            switch (state) {
+                case FlowContext.STATE_CONNECTING:
+                    flowContext.clear();
+                    break;
+                case FlowContext.STATE_CONNECTED:
+                    flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY));
+                    break;
+                case FlowContext.STATE_ERROR:
+                    changeFlowState(flowContext, STATE_DISCONNECTED);
+                    break;
+                case FlowContext.STATE_RESP_RECEIVED:
+                    flowContext.sentEvent(null);
+                    flowContext.receivedStartTimestamp(-1);
+                    break;
+                case FlowContext.STATE_DISCONNECTING:
+                    if (flowContext.state() < FlowContext.STATE_DISCONNECTING) {
+                        if (flowContext.channelContext() != null) {
+                            try {
+                                flowContext.channelContext().close();
+                            } catch (Exception e) {
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug(e.getMessage(), e);
+                                }
+                            }
+                        } else {
+                            changeFlowState(flowContext, STATE_DISCONNECTED);
+                        }
+                    } else {
+                        return;
+                    }
+                    break;
+                case STATE_DISCONNECTED:
+                    flowContext.state(STATE_DISCONNECTED);
+                    flowContext.clear();
+                    removeFlowContext(flowContext);
+                    flowStateChanged(flowContext, oldState);
+                    return;
+            }
+
+            long timeout = timeouts.get(flowContext.state());
+            flowContext.timeout(timeGenerator.currentTimeMillis() + timeout);
+            flowContext.state(state);
+            flowStateChanged(flowContext, oldState);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+    }
+
+    protected void flowStateChanged(FlowContext context, int oldState) {
+
+    }
+
+    protected FlowContext flowContext(SessionEvent event) {
+        return flowContext(event.getSessionInfo());
+    }
+
+    protected FlowContext flowContext(ChannelContext context) {
+        return flowContext(context.getSessionInfo());
+    }
+
+    protected FlowContext flowContext(SessionInfo session) {
+        FlowContext context = sessions.get(session);
+        if (context == null) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Context for session '" + session + "' not found.");
+            }
+        }
+
+        return context;
+    }
+
+    protected FlowContext createFlowContext(SessionInfo session) {
+        return new FlowContext(session);
+    }
+
+    protected FlowContext register(SessionEvent sessionEvent) {
+        return register(sessionEvent.getSessionInfo());
+    }
+
+    protected FlowContext register(SessionInfo session) {
+        synchronized (lock) {
+            if (sessions.containsKey(session)) {
+                logger.warn("Unable to register session '" + session + "'. Session already registered.");
+                return null;
+            }
+
+            FlowContext flowContext = createFlowContext(session);
+            //TODO Malo optymalne
+            FlowHandler client = clientFactory.create(session.getProtocolId());
+            client.init(flowContext);
+            flowContext.client(client);
+            sessions.put(session, flowContext);
+            return flowContext;
+        }
+    }
+
+    protected FlowContext connect(SessionEvent sessionEvent) {
+        return connect(sessionEvent.getSessionInfo());
+    }
+
+    protected FlowContext connect(SessionInfo session) {
+        synchronized (lock) {
+            try {
+                FlowContext flowContext = register(session);
+                if (flowContext != null) {
+                    emitter.connect(session, this, index);
+                    return flowContext;
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+            return null;
+        }
+    }
+
+    @Override
+    public void close() {
+        synchronized (lock) {
+            for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) {
+                FlowContext flowContext = entry.getValue();
+                try {
+                    closeSession(flowContext);
+                } catch (Exception e) {
+                    if (logger.isDebugEnabled()) {
+                        debug(flowContext, e.getMessage(), e);
+                    }
+                }
+            }
+
+            sessions.clear();
+            working = false;
+        }
+    }
+
+    protected void close(SessionEvent sessionEvent) {
+        close(sessionEvent.getSessionInfo());
+    }
+
+    protected void close(FlowContext flowContext) {
+        synchronized (lock) {
+            try {
+                closeSession(flowContext);
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void close(SessionInfo session) {
+        synchronized (lock) {
+            try {
+                FlowContext flowContext = flowContext(session);
+                closeSession(flowContext);
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    protected void closeSession(FlowContext flowContext) {
+        synchronized (lock) {
+            if (flowContext != null) {
+                changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING);
+            }
+        }
+    }
+
+    protected void removeFlowContext(FlowContext flowContext) {
+        synchronized (lock) {
+            debug(flowContext, "removeFlowContext");
+            sessions.remove(flowContext.sessionInfo());
+        }
+    }
+
+    protected void reconnect(FlowContext flowContext) {
+        synchronized (lock) {
+            try {
+                if (logger.isDebugEnabled()) {
+                    debug(flowContext, "Reconnect (state: {}).", contextStateToString(flowContext.state()));
+                }
+
+                SessionInfo session = flowContext.sessionInfo();
+                changeFlowState(flowContext, FlowContext.STATE_CONNECTING);
+                emitter.connect(session, this, index);
+            } catch (Exception e) {
+                error(flowContext, e.getMessage(), e);
+            }
+        }
+    }
+
+    protected void closeAllConnections() {
+        synchronized (lock) {
+            for (FlowContext flowContext : sessions.values()) {
+                closeSession(flowContext);
+            }
+        }
+    }
+
+    private void sleepSilently(long millis) {
+        if (millis == 0) {
+            return;
+        }
+        if (millis < 0) {
+            logger.warn("Cannot sleep for negative interval: {}.");
+            return;
+        }
+        logger.debug("Going sleep for: {}.", millis);
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException ignore) {
+        }
+    }
+
+    protected void sleep(Event event) {
+        if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) {
+            if (lastEventTimestamp != -1) {
+                long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor);
+                sleepSilently(timeToSleep);
+            }
+            lastEventTimestamp = event.getTimestamp();
+        }
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+        synchronized (lock) {
+            super.writeMetrics(container);
+        }
+    }
+
+    @Override
+    public void channelActive(ChannelContext context) throws Exception {
+        synchronized (lock) {
+            FlowContext flowContext = flowContext(context);
+            if (flowContext != null) {
+                if (logger.isDebugEnabled()) {
+                    debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})",
+                            context.getLocalAddress(),
+                            context.getRemoteAddress());
+                }
+
+                context.setBidirectional(flowContext.isBidirectional());
+                flowContext.channelContext(context);
+                changeFlowState(flowContext, STATE_CONNECTED);
+            }
+
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    public void channelInactive(ChannelContext context) throws Exception {
+        synchronized (lock) {
+            FlowContext flowContext = flowContext(context);
+            if (flowContext != null) {
+                if (logger.isDebugEnabled()) {
+                    debug(flowContext, "Channel inactive.");
+                }
+
+                changeFlowState(flowContext, STATE_DISCONNECTED);
+            }
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    public void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
+        synchronized (lock) {
+            FlowContext flowContext = flowContext(context);
+            try {
+                if (flowContext != null) {
+                    FlowHandler client = flowContext.client();
+                    FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext);
+                    decoder.decode(data, flowContext);
+                    long now = timeGenerator.currentTimeMillis();
+                    if (flowContext.receivedStartTimestamp() == -1) {
+                        flowContext.receivedStartTimestamp(now);
+                    }
+
+                    if (decoder.state() == DataDecoder.STATE_ERROR) {
+                        if (logger.isDebugEnabled()) {
+                            debug(flowContext, "Decoder error. " + decoder.getLastError());
+                        }
+
+                        decoder.clear(flowContext);
+                        changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED);
+                    } else if (decoder.state() == DataDecoder.STATE_FINISHED) {
+                        Object resp = decoder.getResult();
+                        Object req = null;
+                        if (flowContext.sentEvent() != null) {
+                            req = flowContext.sentEvent().getRequest();
+                        }
+
+                        if (filterChain.filterInbound(req, resp, flowContext) != Filter.DENY) {
+                            try {
+                                fireResponseReceived(req, resp, flowContext);
+                            } catch (Exception e) {
+                                error(flowContext, e.getMessage(), e);
+                            }
+                        }
+
+                        decoder.clear(flowContext);
+                        changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED);
+                    }
+                }
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    debug(flowContext, e.getMessage(), e);
+                }
+            }
+
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    public void dataWriteStart(ChannelContext context) {
+        synchronized (lock) {
+            FlowContext flowContext = flowContext(context);
+            if (flowContext != null && flowContext.sentEvent() != null) {
+                long now = timeGenerator.currentTimeMillis();
+                flowContext.sendStartTimestamp(now);
+                flowContext.client().onDataWriteStart(flowContext);
+            }
+        }
+    }
+
+    @Override
+    public void dataWritten(ChannelContext context) throws Exception {
+        synchronized (lock) {
+            FlowContext flowContext = flowContext(context);
+            if (flowContext != null && flowContext.sentEvent() != null) {
+                long now = timeGenerator.currentTimeMillis();
+                if (collectMetric) {
+                    synchronized (metric) {
+                        metric.addRequestSendingTime(now - flowContext.sendStartTimestamp());
+                    }
+                }
+
+                flowContext.client().onDataWriteEnd(flowContext);
+            }
+
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    public void errorOccurred(ChannelContext context, Throwable cause) throws Exception {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Error occurred. " + cause.getMessage(), cause);
+        }
+
+        synchronized (lock) {
+            FlowContext flowContext = flowContext(context);
+            if (flowContext != null) {
+                changeFlowState(flowContext, FlowContext.STATE_ERROR);
+            }
+
+            lock.notifyAll();
+        }
+    }
+
+    protected boolean send(FlowContext flowContext, SessionPayloadEvent event) {
+        synchronized (lock) {
+            Object req = event.getRequest();
+            if (req != null) {
+                if (filterChain.filterOutbound(req, event.getResponse(), flowContext) == Filter.DENY) {
+                    return false;
+                }
+
+                FlowHandler client = flowContext.client();
+                FlowHandlerDataEncoder encoder = client.getRequestEncoder(flowContext);
+                ByteBuff buffer = flowContext.buffer();
+                encoder.encode(req, flowContext, buffer);
+
+                if (collectMetric) {
+                    synchronized (metric) {
+                        metric.incRequestsNum();
+                        metric.addRequestSize(flowContext.buffer().readableBytes());
+                    }
+                }
+
+                try {
+                    changeFlowState(flowContext, FlowContext.STATE_REQ_SENT);
+                    flowContext.sentEvent(event);
+                    flowContext.channelContext().writeAndFlush(buffer);
+                    buffer.clear();
+                    return true;
+                } catch (Exception e) {
+                    if (logger.isDebugEnabled()) {
+                        debug(flowContext, e.getMessage(), e);
+                    }
+                }
+            }
+        }
+
+        return false;
+    }
+
+    protected void processTimeouts() {
+        synchronized (lock) {
+            try {
+                long now = timeGenerator.currentTimeMillis();
+                if (nextCheckTimeoutsTime == -1) {
+                    nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
+                } else if (nextCheckTimeoutsTime > now) {
+                    nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
+                    for (FlowContext flowContext : sessions.values()) {
+                        if (flowContext.timeouted()) {
+                            if (logger.isDebugEnabled()) {
+                                debug(flowContext, "Flow for session '{}' timed out (state '{}').",
+                                        flowContext.sessionInfo(),
+                                        contextStateToString(flowContext.state()));
+                            }
+
+                            switch (flowContext.state()) {
+                                case FlowContext.STATE_CONNECTING:
+                                case FlowContext.STATE_CONNECTED:
+                                case FlowContext.STATE_REQ_SENT:
+                                case FlowContext.STATE_ERROR:
+                                    closeSession(flowContext);
+                                    break;
+                                case FlowContext.STATE_RESP_RECEIVED:
+                                    //Dziwny blad nie powinien wystepowac
+                                    break;
+                                case FlowContext.STATE_DISCONNECTING:
+                                case STATE_DISCONNECTED:
+                                    removeFlowContext(flowContext);
+                                    break;
+                            }
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    protected Event eventInstanceForWorker(Event event) {
+        if (event instanceof SessionEvent) {
+            Event newEvent = ((SessionEvent) event).instanceForWorker(index);
+            newEvent.setTimestamp(event.getTimestamp());
+            return newEvent;
+        } else {
+            return event;
+        }
+    }
+
+
+}
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java	Tue Jul 23 10:44:29 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,569 +0,0 @@
-package com.passus.st.client;
-
-import com.passus.commons.Assert;
-import com.passus.data.ByteBuff;
-import com.passus.data.DataDecoder;
-import com.passus.data.HeapByteBuff;
-import com.passus.filter.Filter;
-import com.passus.st.emitter.ChannelContext;
-import com.passus.st.emitter.Emitter;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.metric.MetricsContainer;
-import it.unimi.dsi.fastutil.ints.Int2LongArrayMap;
-import it.unimi.dsi.fastutil.ints.Int2LongMap;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static com.passus.st.client.FlowContext.*;
-import static com.passus.st.client.FlowUtils.DEFAULT_TIMEOUTS;
-
-public abstract class FlowWorkerBased extends FlowWorker {
-
-    public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f;
-
-    protected final Map<SessionInfo, FlowContext> sessions = new ConcurrentHashMap<>();
-
-    private final Set<SessionInfo> blockedSessions = new HashSet<>();
-
-    private final Int2LongMap timeouts = new Int2LongArrayMap();
-
-    protected final Object lock = new Object();
-
-    protected volatile boolean working = false;
-
-    private long checkTimeoutsPeriod = 5_000;
-
-    private long nextCheckTimeoutsTime = -1;
-
-    private float sleepFactor = SLEEP_FACTOR_NO_SLEEP;
-
-    private long lastEventTimestamp = -1;
-
-    public FlowWorkerBased(Emitter emitter, String name, int index) {
-        super(emitter, name, index);
-        timeouts.putAll(DEFAULT_TIMEOUTS);
-    }
-
-    @Override
-    public boolean isWorking() {
-        return working;
-    }
-
-    @Override
-    public int activeConnections() {
-        int count = 0;
-        synchronized (lock) {
-            for (FlowContext flowContext : sessions.values()) {
-                if (flowContext.state() != STATE_DISCONNECTED) {
-                    count++;
-                }
-            }
-        }
-
-        return count;
-    }
-
-    protected final void addBlockedSession(SessionInfo session) {
-        blockedSessions.add(session);
-    }
-
-    protected final boolean isBlockedSession(SessionInfo session) {
-        return !blockedSessions.isEmpty() && blockedSessions.contains(session);
-    }
-
-    public float getSleepFactor() {
-        return sleepFactor;
-    }
-
-    public void setSleepFactor(float sleepFactor) {
-        Assert.greaterOrEqualZero(sleepFactor, "sleepFactor");
-        this.sleepFactor = sleepFactor;
-    }
-
-    public long getCheckTimeoutsPeriod() {
-        return checkTimeoutsPeriod;
-    }
-
-    public void setCheckTimeoutsPeriod(long checkTimeoutsPeriod) {
-        Assert.greaterThanZero(checkTimeoutsPeriod, "checkTimeoutsPeriod");
-        this.checkTimeoutsPeriod = checkTimeoutsPeriod;
-    }
-
-    protected final void changeFlowState(FlowContext flowContext, int state) {
-        try {
-            if (flowContext.state() == state) {
-                return;
-            }
-
-            int oldState = flowContext.state();
-            if (logger.isDebugEnabled()) {
-                debug(flowContext, "Flow status changing {} -> {}.",
-                        contextStateToString(flowContext.state()),
-                        contextStateToString(state)
-                );
-            }
-
-            switch (state) {
-                case FlowContext.STATE_CONNECTING:
-                    flowContext.clear();
-                    break;
-                case FlowContext.STATE_CONNECTED:
-                    flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY));
-                    break;
-                case FlowContext.STATE_ERROR:
-                    changeFlowState(flowContext, STATE_DISCONNECTED);
-                    break;
-                case FlowContext.STATE_RESP_RECEIVED:
-                    flowContext.sentEvent(null);
-                    flowContext.receivedStartTimestamp(-1);
-                    break;
-                case FlowContext.STATE_DISCONNECTING:
-                    if (flowContext.state() < FlowContext.STATE_DISCONNECTING) {
-                        if (flowContext.channelContext() != null) {
-                            try {
-                                flowContext.channelContext().close();
-                            } catch (Exception e) {
-                                if (logger.isDebugEnabled()) {
-                                    logger.debug(e.getMessage(), e);
-                                }
-                            }
-                        } else {
-                            changeFlowState(flowContext, STATE_DISCONNECTED);
-                        }
-                    } else {
-                        return;
-                    }
-                    break;
-                case STATE_DISCONNECTED:
-                    flowContext.state(STATE_DISCONNECTED);
-                    flowContext.clear();
-                    removeFlowContext(flowContext);
-                    flowStateChanged(flowContext, oldState);
-                    return;
-            }
-
-            long timeout = timeouts.get(flowContext.state());
-            flowContext.timeout(timeGenerator.currentTimeMillis() + timeout);
-            flowContext.state(state);
-            flowStateChanged(flowContext, oldState);
-        } catch (Exception e) {
-            logger.debug(e.getMessage(), e);
-        }
-    }
-
-    protected void flowStateChanged(FlowContext context, int oldState) {
-
-    }
-
-    protected FlowContext flowContext(SessionEvent event) {
-        return flowContext(event.getSessionInfo());
-    }
-
-    protected FlowContext flowContext(ChannelContext context) {
-        return flowContext(context.getSessionInfo());
-    }
-
-    protected FlowContext flowContext(SessionInfo session) {
-        FlowContext context = sessions.get(session);
-        if (context == null) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Context for session '" + session + "' not found.");
-            }
-        }
-
-        return context;
-    }
-
-    protected FlowContext createFlowContext(SessionInfo session) {
-        return new FlowContext(session);
-    }
-
-    protected FlowContext register(SessionEvent sessionEvent) {
-        return register(sessionEvent.getSessionInfo());
-    }
-
-    protected FlowContext register(SessionInfo session) {
-        synchronized (lock) {
-            if (sessions.containsKey(session)) {
-                logger.warn("Unable to register session '" + session + "'. Session already registered.");
-                return null;
-            }
-
-            FlowContext flowContext = createFlowContext(session);
-            //TODO Malo optymalne
-            FlowHandler client = clientFactory.create(session.getProtocolId());
-            client.init(flowContext);
-            flowContext.client(client);
-            sessions.put(session, flowContext);
-            return flowContext;
-        }
-    }
-
-    protected FlowContext connect(SessionEvent sessionEvent) {
-        return connect(sessionEvent.getSessionInfo());
-    }
-
-    protected FlowContext connect(SessionInfo session) {
-        synchronized (lock) {
-            try {
-                FlowContext flowContext = register(session);
-                if (flowContext != null) {
-                    emitter.connect(session, this, index);
-                    return flowContext;
-                }
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
-            }
-            return null;
-        }
-    }
-
-    @Override
-    public void close() {
-        synchronized (lock) {
-            for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) {
-                FlowContext flowContext = entry.getValue();
-                try {
-                    closeSession(flowContext);
-                } catch (Exception e) {
-                    if (logger.isDebugEnabled()) {
-                        debug(flowContext, e.getMessage(), e);
-                    }
-                }
-            }
-
-            sessions.clear();
-            working = false;
-        }
-    }
-
-    protected void close(SessionEvent sessionEvent) {
-        close(sessionEvent.getSessionInfo());
-    }
-
-    protected void close(FlowContext flowContext) {
-        synchronized (lock) {
-            try {
-                closeSession(flowContext);
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug(e.getMessage(), e);
-                }
-            }
-        }
-    }
-
-    @Override
-    public void close(SessionInfo session) {
-        synchronized (lock) {
-            try {
-                FlowContext flowContext = flowContext(session);
-                closeSession(flowContext);
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug(e.getMessage(), e);
-                }
-            }
-        }
-    }
-
-    protected void closeSession(FlowContext flowContext) {
-        synchronized (lock) {
-            if (flowContext != null) {
-                changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING);
-            }
-        }
-    }
-
-    protected void removeFlowContext(FlowContext flowContext) {
-        synchronized (lock) {
-            debug(flowContext, "removeFlowContext");
-            sessions.remove(flowContext.sessionInfo());
-        }
-    }
-
-    protected void reconnect(FlowContext flowContext) {
-        synchronized (lock) {
-            try {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, "Reconnect (state: {}).", contextStateToString(flowContext.state()));
-                }
-
-                SessionInfo session = flowContext.sessionInfo();
-                changeFlowState(flowContext, FlowContext.STATE_CONNECTING);
-                emitter.connect(session, this, index);
-            } catch (Exception e) {
-                error(flowContext, e.getMessage(), e);
-            }
-        }
-    }
-
-    protected void closeAllConnections() {
-        synchronized (lock) {
-            for (FlowContext flowContext : sessions.values()) {
-                closeSession(flowContext);
-            }
-        }
-    }
-
-    private void sleepSilently(long millis) {
-        if (millis == 0) {
-            return;
-        }
-        if (millis < 0) {
-            logger.warn("Cannot sleep for negative interval: {}.");
-            return;
-        }
-        logger.debug("Going sleep for: {}.", millis);
-        try {
-            Thread.sleep(millis);
-        } catch (InterruptedException ignore) {
-        }
-    }
-
-    protected void sleep(Event event) {
-        if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) {
-            if (lastEventTimestamp != -1) {
-                long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor);
-                sleepSilently(timeToSleep);
-            }
-            lastEventTimestamp = event.getTimestamp();
-        }
-    }
-
-    @Override
-    public void writeMetrics(MetricsContainer container) {
-        synchronized (lock) {
-            super.writeMetrics(container);
-        }
-    }
-
-    @Override
-    public void channelActive(ChannelContext context) throws Exception {
-        synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            if (flowContext != null) {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})",
-                            context.getLocalAddress(),
-                            context.getRemoteAddress());
-                }
-
-                context.setBidirectional(flowContext.isBidirectional());
-                flowContext.channelContext(context);
-                changeFlowState(flowContext, STATE_CONNECTED);
-            }
-
-            lock.notifyAll();
-        }
-    }
-
-    @Override
-    public void channelInactive(ChannelContext context) throws Exception {
-        synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            if (flowContext != null) {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, "Channel inactive.");
-                }
-
-                changeFlowState(flowContext, STATE_DISCONNECTED);
-            }
-            lock.notifyAll();
-        }
-    }
-
-    @Override
-    public void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
-        synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            try {
-                if (flowContext != null) {
-                    FlowHandler client = flowContext.client();
-                    FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext);
-                    decoder.decode(data, flowContext);
-                    long now = timeGenerator.currentTimeMillis();
-                    if (flowContext.receivedStartTimestamp() == -1) {
-                        flowContext.receivedStartTimestamp(now);
-                    }
-
-                    if (decoder.state() == DataDecoder.STATE_ERROR) {
-                        if (logger.isDebugEnabled()) {
-                            debug(flowContext, "Decoder error. " + decoder.getLastError());
-                        }
-
-                        decoder.clear(flowContext);
-                        changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED);
-                    } else if (decoder.state() == DataDecoder.STATE_FINISHED) {
-                        Object resp = decoder.getResult();
-                        Object req = null;
-                        if (flowContext.sentEvent() != null) {
-                            req = flowContext.sentEvent().getRequest();
-                        }
-
-                        if (filterChain.filterInbound(req, resp, flowContext) != Filter.DENY) {
-                            try {
-                                fireResponseReceived(req, resp, flowContext);
-                            } catch (Exception e) {
-                                error(flowContext, e.getMessage(), e);
-                            }
-                        }
-
-                        decoder.clear(flowContext);
-                        changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED);
-                    }
-                }
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, e.getMessage(), e);
-                }
-            }
-
-            lock.notifyAll();
-        }
-    }
-
-    @Override
-    public void dataWriteStart(ChannelContext context) {
-        synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            if (flowContext != null && flowContext.sentEvent() != null) {
-                long now = timeGenerator.currentTimeMillis();
-                flowContext.sendStartTimestamp(now);
-                flowContext.client().onDataWriteStart(flowContext);
-            }
-        }
-    }
-
-    @Override
-    public void dataWritten(ChannelContext context) throws Exception {
-        synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            if (flowContext != null && flowContext.sentEvent() != null) {
-                long now = timeGenerator.currentTimeMillis();
-                if (collectMetric) {
-                    synchronized (metric) {
-                        metric.addRequestSendingTime(now - flowContext.sendStartTimestamp());
-                    }
-                }
-
-                flowContext.client().onDataWriteEnd(flowContext);
-            }
-
-            lock.notifyAll();
-        }
-    }
-
-    @Override
-    public void errorOccurred(ChannelContext context, Throwable cause) throws Exception {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Error occurred. " + cause.getMessage(), cause);
-        }
-
-        synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            if (flowContext != null) {
-                changeFlowState(flowContext, FlowContext.STATE_ERROR);
-            }
-
-            lock.notifyAll();
-        }
-    }
-
-    protected boolean send(FlowContext flowContext, SessionPayloadEvent event) {
-        synchronized (lock) {
-            Object req = event.getRequest();
-            if (req != null) {
-                if (filterChain.filterOutbound(req, event.getResponse(), flowContext) == Filter.DENY) {
-                    return false;
-                }
-
-                FlowHandler client = flowContext.client();
-                FlowHandlerDataEncoder encoder = client.getRequestEncoder(flowContext);
-                ByteBuff buffer = flowContext.buffer();
-                encoder.encode(req, flowContext, buffer);
-
-                if (collectMetric) {
-                    synchronized (metric) {
-                        metric.incRequestsNum();
-                        metric.addRequestSize(flowContext.buffer().readableBytes());
-                    }
-                }
-
-                try {
-                    changeFlowState(flowContext, FlowContext.STATE_REQ_SENT);
-                    flowContext.sentEvent(event);
-                    flowContext.channelContext().writeAndFlush(buffer);
-                    buffer.clear();
-
-
-
-                    return true;
-                } catch (Exception e) {
-                    if (logger.isDebugEnabled()) {
-                        debug(flowContext, e.getMessage(), e);
-                    }
-                }
-            }
-        }
-
-        return false;
-    }
-
-    protected void processTimeouts() {
-        synchronized (lock) {
-            try {
-                long now = timeGenerator.currentTimeMillis();
-                if (nextCheckTimeoutsTime == -1) {
-                    nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
-                } else if (nextCheckTimeoutsTime > now) {
-                    nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
-                    for (FlowContext flowContext : sessions.values()) {
-                        if (flowContext.timeouted()) {
-                            if (logger.isDebugEnabled()) {
-                                debug(flowContext, "Flow for session '{}' timed out (state '{}').",
-                                        flowContext.sessionInfo(),
-                                        contextStateToString(flowContext.state()));
-                            }
-
-                            switch (flowContext.state()) {
-                                case FlowContext.STATE_CONNECTING:
-                                case FlowContext.STATE_CONNECTED:
-                                case FlowContext.STATE_REQ_SENT:
-                                case FlowContext.STATE_ERROR:
-                                    closeSession(flowContext);
-                                    break;
-                                case FlowContext.STATE_RESP_RECEIVED:
-                                    //Dziwny blad nie powinien wystepowac
-                                    break;
-                                case FlowContext.STATE_DISCONNECTING:
-                                case STATE_DISCONNECTED:
-                                    removeFlowContext(flowContext);
-                                    break;
-                            }
-                        }
-                    }
-                }
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug(e.getMessage(), e);
-                }
-            }
-        }
-    }
-
-    protected Event eventInstanceForWorker(Event event) {
-        if (event instanceof SessionEvent) {
-            Event newEvent = ((SessionEvent) event).instanceForWorker(index);
-            newEvent.setTimestamp(event.getTimestamp());
-            return newEvent;
-        } else {
-            return event;
-        }
-    }
-
-
-}
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Tue Jul 23 10:44:29 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Wed Jul 24 14:19:28 2019 +0200
@@ -12,7 +12,7 @@
 import java.util.concurrent.Semaphore;
 
 @Plugin(name = ParallelFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER)
-public class ParallelFlowWorker extends FlowWorkerBased {
+public class ParallelFlowWorker extends FlowWorkerBase {
 
     public static final String TYPE = "parallel";
 
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Tue Jul 23 10:44:29 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Wed Jul 24 14:19:28 2019 +0200
@@ -12,7 +12,7 @@
 import static com.passus.st.client.FlowContext.contextStateToString;
 
 @Plugin(name = SynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER)
-public class SynchFlowWorker extends FlowWorkerBased {
+public class SynchFlowWorker extends FlowWorkerBase {
 
     public static final String TYPE = "synch";
 
--- a/stress-tester/src/main/java/com/passus/st/source/BaseSessionAnalyzerListener.java	Tue Jul 23 10:44:29 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/BaseSessionAnalyzerListener.java	Wed Jul 24 14:19:28 2019 +0200
@@ -10,7 +10,6 @@
 
 import static com.passus.net.session.TcpSessionProcessorConstants.STATUS_CLOSE;
 import static com.passus.net.session.TcpSessionProcessorConstants.STATUS_ESTABLISHED;
-import static com.passus.st.Protocols.HTTP;
 
 public abstract class BaseSessionAnalyzerListener<T> implements SessionAnalyzerListener<T> {
 
@@ -70,7 +69,7 @@
         SessionInfo info = new SessionInfo(
                 context.getSrcIpAddr(), context.getSrcPort(),
                 context.getDstIpAddr(), context.getDstPort(),
-                HTTP, context.getProtocol(), context.getId());
+                context.getProtocol(), protocolId, context.getId());
         info.setSourceName(sourceName);
         Event event = new SessionStatusEvent(info, sessionInfoStatus, sourceName);
         event.setTimestamp(timestamp);
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java	Tue Jul 23 10:44:29 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java	Wed Jul 24 14:19:28 2019 +0200
@@ -22,10 +22,8 @@
 import com.passus.net.session.TcpSessionProcessor;
 import com.passus.net.session.UdpSessionProcessor;
 import com.passus.net.source.pcap.FrameDecoderImpl;
-import com.passus.st.client.AbstractEvent;
 import com.passus.st.client.DataEvents.DataEnd;
 import com.passus.st.client.DataEvents.DataLoopEnd;
-import com.passus.st.client.Event;
 import com.passus.st.client.EventHandler;
 import com.passus.st.metric.MetricSource;
 import com.passus.st.metric.MetricsContainer;
@@ -37,9 +35,6 @@
 
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import static com.passus.config.schema.ConfigurationSchemaBuilder.*;
 
@@ -70,12 +65,8 @@
 
     private PcapThread pcapThread;
 
-    private SessionAnalyzerThread sessionAnalyzerThread;
-
     private EventHandler handler;
 
-    private final BlockingQueue<Event> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
-
     private boolean allowPartialSession = false;
 
     protected boolean collectMetric;
@@ -235,8 +226,7 @@
     }
 
     public boolean isWorking() {
-        return (pcapThread != null && pcapThread.working)
-                || (sessionAnalyzerThread != null && sessionAnalyzerThread.working);
+        return (pcapThread != null && pcapThread.working);
     }
 
     private static void sleepFor(long millis) {
@@ -259,16 +249,13 @@
         }
 
         try {
-            pcapThread = new PcapThread(pcapFile, loops);
-            sessionAnalyzerThread = new SessionAnalyzerThread(allowPartialSession);
-
+            SessionPacketHandler sessionPacketHandler = new SessionPacketHandler(allowPartialSession);
+            pcapThread = new PcapThread(pcapFile, sessionPacketHandler, loops);
             pcapThread.start();
-            sessionAnalyzerThread.start();
             started = true;
         } catch (Exception e) {
             throw new ServiceException(e.getMessage(), e);
         }
-
     }
 
     @Override
@@ -285,29 +272,12 @@
         } catch (Exception ignored) {
         }
 
-        sessionAnalyzerThread.deactivate();
-        sessionAnalyzerThread.interrupt();
-
-        try {
-            sessionAnalyzerThread.join(500);
-        } catch (Exception ignored) {
-        }
-
         pcapThread = null;
-        sessionAnalyzerThread = null;
-        queue.clear();
         started = false;
     }
 
-    public void waitForEmptyQueue() throws InterruptedException {
-        while (!queue.isEmpty()) {
-            Thread.sleep(100);
-        }
-    }
-
     public void join() throws InterruptedException {
         pcapThread.join();
-        waitForEmptyQueue();
     }
 
     private class PcapThread extends Thread {
@@ -320,10 +290,13 @@
 
         private volatile boolean working = false;
 
-        private PcapThread(String pcapFile, int loop) throws IOException {
+        private final SessionPacketHandler sessionPacketHandler;
+
+        private PcapThread(String pcapFile, SessionPacketHandler sessionPacketHandler, int loop) throws IOException {
             super(PcapSessionEventSource.class.getSimpleName() + ".PcapThread");
             reader = new PcapDataBlockReader(pcapFile);
             reader.open();
+            this.sessionPacketHandler = sessionPacketHandler;
             this.loop = loop;
         }
 
@@ -331,7 +304,7 @@
             try {
                 working = false;
                 reader.close();
-                dataBlockHandler.working = false;
+                sessionPacketHandler.deactivate();
             } catch (Exception e) {
                 if (LOGGER.isDebugEnabled()) {
                     LOGGER.debug(e.getMessage(), e);
@@ -339,22 +312,6 @@
             }
         }
 
-        private void offer(Event event) {
-            boolean res = true;
-            do {
-                try {
-                    res = queue.offer(event, 100, TimeUnit.MILLISECONDS);
-                } catch (Exception e) {
-                    if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug(e.getMessage(), e);
-                    }
-                }
-            } while (working && !res);
-            if (collectMetric) {
-                metric.incEnqueued();
-            }
-        }
-
         @Override
         public void run() {
             working = true;
@@ -372,19 +329,24 @@
                             metric.incFrames();
                         }
 
-                        dataBlockHandler.handle(dataBlock);
+                        Ip ip = dataBlockHandler.handle(dataBlock);
+                        if (ip != null) {
+                            sessionPacketHandler.handle(ip);
+                        }
                     }
 
                     if (working) {
                         reader.reset();
                     }
 
+                    sessionPacketHandler.onLoopEnd();
                     DataLoopEnd loopEvent = new DataLoopEnd(sourceName, loopNum);
-                    offer(loopEvent);
+                    handler.handle(loopEvent);
                     if (loop != LOOP_INFINITE) {
                         loop--;
                         if (loop == 0) {
-                            offer(new DataEnd(sourceName));
+                            sessionPacketHandler.onDataEnd();
+                            handler.handle(new DataEnd(sourceName));
                             break;
                         }
                     }
@@ -395,7 +357,6 @@
                         sleepFor(loopDelay);
                     }
                 }
-
             } catch (Exception e) {
                 LOGGER.error(e.getMessage(), e);
             }
@@ -407,8 +368,6 @@
 
     private class DataBlockHandler {
 
-        private boolean working = true;
-
         private long frameNum = 0;
 
         private long firstTimestamp = -1;
@@ -425,28 +384,12 @@
             this.loopDelay = loopDelay;
         }
 
-        private void offer(Event event) {
-            boolean res = true;
-            do {
-                try {
-                    res = queue.offer(event, 100, TimeUnit.MILLISECONDS);
-                } catch (Exception e) {
-                    if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug(e.getMessage(), e);
-                    }
-                }
-            } while (working && !res);
-            if (collectMetric) {
-                metric.incEnqueued();
-            }
-        }
-
         private void nextLoop() {
             firstTimestamp = -1;
             loopCorrection = relativeTimestamp + loopDelay;
         }
 
-        public void handle(PcapDataBlock dataBlock) {
+        public Ip handle(PcapDataBlock dataBlock) {
             Frame frame = new MemoryFrame(dataBlock.getData(), dataBlock.getTimestamp());
             frameDecoder.decode(frame);
             frame.setNumber(frameNum++);
@@ -466,18 +409,17 @@
                             }
 
                             frame.setTimestamp(relativeTimestamp);
-                            offer(new IpEvent(ip));
+                            return ip;
                         }
                     }
                 }
             }
+
+            return null;
         }
-
     }
 
-    private class SessionAnalyzerThread extends Thread {
-
-        private volatile boolean working = false;
+    private class SessionPacketHandler {
 
         private long sessionFlushPeriod = 500;
 
@@ -493,9 +435,7 @@
 
         private final PcapSessionAnalyzerHookContext hookContext;
 
-        private SessionAnalyzerThread(boolean allowPartialSession) {
-            super(PcapSessionEventSource.class.getSimpleName() + ".SessionAnalyzerThread");
-
+        private SessionPacketHandler(boolean allowPartialSession) {
             tcpProc = new TcpSessionProcessor();
             tcpProc.setTimeGenerator(timeGenerator);
             tcpProc.setAllowPartialSession(allowPartialSession);
@@ -508,90 +448,53 @@
                 PcapSessionAnalyzerHook hook = getHook(analyzer);
                 hook.attach(analyzer, hookContext);
             }
+
+            tcpProc.start();
+            udpProc.start();
+            analyzers.forEach(Service::start);
         }
 
         public void deactivate() {
-            working = false;
             tcpProc.stop();
             udpProc.stop();
             analyzers.forEach(Service::stop);
         }
 
-        @Override
-        public void run() {
-            working = true;
-            tcpProc.start();
-            udpProc.start();
-            analyzers.forEach(Service::start);
-
-            long lastTime = 0;
-            while (working) {
-                try {
-                    Event event = queue.poll(100, TimeUnit.MILLISECONDS);
-
-                    if (event != null) {
-                        if (collectMetric) {
-                            metric.incDequeued();
-                        }
-
-                        LOGGER.trace("queue poll {}", event);
-                        if (event.getType() == IpEvent.TYPE) {
-                            IpEvent ipEvent = (IpEvent) event;
-                            long time = ipEvent.ip.getTimestamp();
-                            timeGenerator.setTimeMillis(time);
-
-                            if (ipEvent.ip.getProtocol() == Ip.PROTO_TCP) {
-                                tcpProc.handle(ipEvent.ip);
-                            } else if (ipEvent.ip.getProtocol() == Ip.PROTO_UDP) {
-                                udpProc.handle(ipEvent.ip);
-                            }
+        public void onLoopEnd() {
+            tcpProc.closeAllSessions();
+            eventHandler.flush();
+        }
 
-                            if (collectMetric) {
-                                if (ipEvent.ip.getProtocol() == Ip.PROTO_TCP) {
-                                    metric.incTcpPacket();
-                                }
-                            }
+        public void onDataEnd() {
+            eventHandler.flush();
+        }
 
-                            if (nextFlushTime == -1) {
-                                nextFlushTime = time + sessionFlushPeriod;
-                            } else if (nextFlushTime <= time) {
-                                tcpProc.flush();
-                                nextFlushTime = time + sessionFlushPeriod;
-                            }
-                            lastTime = time;
-                        } else if (event.getType() == DataLoopEnd.TYPE) {
-                            event.setTimestamp(lastTime + 1);
-                            eventHandler.handle(event);
-                            tcpProc.closeAllSessions();
-                            eventHandler.flush();
-                        } else if (event.getType() == DataEnd.TYPE) {
-                            event.setTimestamp(lastTime + 1);
-                            eventHandler.handle(event);
-                            eventHandler.flush();
-                            break;
-                        }
-                    }
-                } catch (Exception e) {
-                    if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug(e.getMessage(), e);
-                    }
-                }
-
+        public void handle(Ip ip) {
+            if (collectMetric) {
+                metric.incDequeued();
             }
 
-            working = false;
-        }
-    }
-
-    private static class IpEvent extends AbstractEvent {
+            long time = ip.getTimestamp();
+            timeGenerator.setTimeMillis(time);
 
-        private static final int TYPE = 1;
+            if (ip.getProtocol() == Ip.PROTO_TCP) {
+                tcpProc.handle(ip);
+                if (collectMetric) {
+                    metric.incTcpPacket();
+                }
+            } else if (ip.getProtocol() == Ip.PROTO_UDP) {
+                udpProc.handle(ip);
+                if (collectMetric) {
+                    metric.incUdpPacket();
+                }
+            }
 
-        private final Ip ip;
-
-        public IpEvent(Ip ip) {
-            super(TYPE, null);
-            this.ip = ip;
+            if (nextFlushTime == -1) {
+                nextFlushTime = time + sessionFlushPeriod;
+            } else if (nextFlushTime <= time) {
+                tcpProc.flush();
+                nextFlushTime = time + sessionFlushPeriod;
+            }
         }
 
     }
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSourceMetric.java	Tue Jul 23 10:44:29 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSourceMetric.java	Wed Jul 24 14:19:28 2019 +0200
@@ -22,6 +22,7 @@
 
     private final MutableInt frames = new MutableInt(0);
     private final MutableInt tcpPackets = new MutableInt(0);
+    private final MutableInt udpPackets = new MutableInt(0);
     private final MutableInt payloads = new MutableInt(0);
     private final AtomicInteger events = new AtomicInteger();
     private final MutableInt eventsEnqueued = new MutableInt();
@@ -79,6 +80,10 @@
         tcpPackets.increment();
     }
 
+    public void incUdpPacket() {
+        udpPackets.increment();
+    }
+
     public void incPayloads() {
         payloads.increment();
     }
--- a/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java	Tue Jul 23 10:44:29 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java	Wed Jul 24 14:19:28 2019 +0200
@@ -16,6 +16,7 @@
 import org.testng.annotations.Test;
 
 import java.io.File;
+import java.util.Iterator;
 
 import static org.testng.Assert.assertTrue;
 import static org.testng.AssertJUnit.assertNull;
@@ -95,12 +96,14 @@
         waitForSource(src);
         src.stop();
 
-        SessionPayloadEvent payloadEvent = (SessionPayloadEvent) handler.findFirst(SessionPayloadEvent.TYPE);
+        Iterator<Event> it = handler.getEvents().iterator();
+        SessionPayloadEvent payloadEvent = (SessionPayloadEvent) it.next();
 
         assertTrue(payloadEvent.getRequest() instanceof Dns);
         assertTrue(payloadEvent.getResponse() instanceof Dns);
 
-        assertTrue(handler.get(handler.size() - 1) instanceof DataEnd);
+        assertTrue(it.next() instanceof DataLoopEnd);
+        assertTrue(it.next() instanceof DataEnd);
     }
 
     @Test(enabled = true)