changeset 532:f78f71168bef

HttpTimeWindowClientWorker in progress
author Devel 2
date Thu, 07 Sep 2017 15:42:35 +0200
parents 229736339e0c
children 1822bfe145c6
files stress-tester/src/main/java/com/passus/st/PcapReporter.java stress-tester/src/main/java/com/passus/st/client/SessionStatusEvent.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpTimeWindowClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/filter/HttpMvelFilter.java stress-tester/src/test/java/com/passus/st/client/EventTest.java stress-tester/src/test/java/com/passus/st/client/http/HttpTimeWindowClientWorkerTest.java stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java
diffstat 11 files changed, 509 insertions(+), 25 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/PcapReporter.java	Fri Sep 01 09:59:19 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/PcapReporter.java	Thu Sep 07 15:42:35 2017 +0200
@@ -233,7 +233,7 @@
         @Override
         public void handle(Event event) {
             if (event instanceof SessionEvent) {
-                if (event.getType() == SessionStatusEvent.HTTP_SESSION_STATUS) {
+                if (event.getType() == SessionStatusEvent.TYPE) {
                     SessionStatusEvent statusEvent = (SessionStatusEvent) event;
                     if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
                         try {
@@ -253,7 +253,7 @@
                         }
                     }
 
-                } else if (event.getType() == HttpSessionPayloadEvent.HTTP_SESSION_PAYLOAD) {
+                } else if (event.getType() == HttpSessionPayloadEvent.TYPE) {
                     HttpSessionPayloadEvent sessEvent = (HttpSessionPayloadEvent) event;
                     currFlowContext = flowContext(sessEvent);
                     if (currFlowContext == null && partialSession) {
--- a/stress-tester/src/main/java/com/passus/st/client/SessionStatusEvent.java	Fri Sep 01 09:59:19 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SessionStatusEvent.java	Thu Sep 07 15:42:35 2017 +0200
@@ -8,7 +8,7 @@
  */
 public class SessionStatusEvent extends SessionEvent {
 
-    public static final int HTTP_SESSION_STATUS = 11;
+    public static final int TYPE = 11;
 
     public static final int STATUS_OPENING = 1;
     public static final int STATUS_ESTABLISHED = 2;
@@ -33,7 +33,7 @@
 
     @Override
     public int getType() {
-        return HTTP_SESSION_STATUS;
+        return TYPE;
     }
 
     public int getStatus() {
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java	Fri Sep 01 09:59:19 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java	Thu Sep 07 15:42:35 2017 +0200
@@ -530,7 +530,7 @@
             lock.notifyAll();
         }
     }
-
+  
     protected boolean send(HttpFlowContext context, HttpSessionPayloadEvent event) {
         synchronized (lock) {
             if (event.getRequest() != null) {
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java	Fri Sep 01 09:59:19 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java	Thu Sep 07 15:42:35 2017 +0200
@@ -165,13 +165,13 @@
                 && !localFlowContext.eventsQueue.isEmpty()) {
 
             Event event = localFlowContext.eventsQueue.peek();
-            if (event.getType() == SessionStatusEvent.HTTP_SESSION_STATUS) {
+            if (event.getType() == SessionStatusEvent.TYPE) {
                 SessionStatusEvent statusEvent = (SessionStatusEvent) event;
                 if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
                     localFlowContext.eventsQueue.poll();
                     close((SessionStatusEvent) event);
                 }
-            } else if (event.getType() == HttpSessionPayloadEvent.HTTP_SESSION_PAYLOAD
+            } else if (event.getType() == HttpSessionPayloadEvent.TYPE
                     && canSend(flowContext)) {
                 localFlowContext.eventsQueue.poll();
                 send(flowContext, (HttpSessionPayloadEvent) event);
@@ -196,7 +196,7 @@
     private void processEvent(Event event) {
         if (event instanceof SessionEvent) {
             switch (event.getType()) {
-                case SessionStatusEvent.HTTP_SESSION_STATUS:
+                case SessionStatusEvent.TYPE:
                     SessionStatusEvent statusEvent = (SessionStatusEvent) event;
                     if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
                         LocalHttpFlowContext flowContext = flowContext((SessionEvent) event);
@@ -210,7 +210,7 @@
                         }
                     }
                     break;
-                case HttpSessionPayloadEvent.HTTP_SESSION_PAYLOAD: {
+                case HttpSessionPayloadEvent.TYPE: {
                     HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event;
                     LocalHttpFlowContext flowContext = flowContext(payloadEvent);
                     if (flowContext != null) {
@@ -261,11 +261,11 @@
     public void handle(Event event) {
         Event newEvent = null;
         switch (event.getType()) {
-            case HttpSessionPayloadEvent.HTTP_SESSION_PAYLOAD:
+            case HttpSessionPayloadEvent.TYPE:
                 semaphore.acquireUninterruptibly();
                 newEvent = eventInstanceForWorker(event);
                 break;
-            case SessionStatusEvent.HTTP_SESSION_STATUS:
+            case SessionStatusEvent.TYPE:
             case DataLoopEnd.TYPE:
             case DataEnd.TYPE:
                 newEvent = event;
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java	Fri Sep 01 09:59:19 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java	Thu Sep 07 15:42:35 2017 +0200
@@ -13,7 +13,7 @@
  */
 public class HttpSessionPayloadEvent extends SessionPayloadEvent<HttpReqResp> {
 
-    public static final int HTTP_SESSION_PAYLOAD = 12;
+    public static final int TYPE = 12;
 
     public HttpSessionPayloadEvent(SessionInfo sessionInfo, HttpRequest req, HttpResponse resp) {
         this(sessionInfo, new HttpReqResp(req, resp));
@@ -33,7 +33,7 @@
 
     @Override
     public int getType() {
-        return HTTP_SESSION_PAYLOAD;
+        return TYPE;
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java	Fri Sep 01 09:59:19 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java	Thu Sep 07 15:42:35 2017 +0200
@@ -123,7 +123,7 @@
             }
 
             if (event instanceof SessionEvent) {
-                if (event.getType() == SessionStatusEvent.HTTP_SESSION_STATUS) {
+                if (event.getType() == SessionStatusEvent.TYPE) {
                     SessionStatusEvent statusEvent = (SessionStatusEvent) event;
                     if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
                         try {
@@ -143,7 +143,7 @@
                     }
 
                     return true;
-                } else if (event.getType() == HttpSessionPayloadEvent.HTTP_SESSION_PAYLOAD) {
+                } else if (event.getType() == HttpSessionPayloadEvent.TYPE) {
                     SessionEvent sessEvent = (SessionEvent) event;
                     HttpFlowContext flowContext = flowContext(sessEvent);
                     if (flowContext != null) {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpTimeWindowClientWorker.java	Thu Sep 07 15:42:35 2017 +0200
@@ -0,0 +1,394 @@
+package com.passus.st.client.http;
+
+import com.passus.commons.Assert;
+import com.passus.commons.annotations.Plugin;
+import com.passus.commons.collection.MixedLinkedList;
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpResponse;
+import com.passus.st.client.DataEvents.DataEnd;
+import com.passus.st.client.DataEvents.DataLoopEnd;
+import com.passus.st.client.Event;
+import com.passus.st.client.SessionEvent;
+import com.passus.st.client.SessionStatusEvent;
+import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.plugin.PluginConstants;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.TreeSet;
+import com.passus.st.client.SessionPayloadEvent;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+@Plugin(name = HttpTimeWindowClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER)
+public class HttpTimeWindowClientWorker extends HttpFlowBasedClientWorker {
+
+    public static final String TYPE = "timeWindow";
+
+    private long waitTimeout = 100;
+
+    private long windowPeriod = 10;
+
+    private long windowStartTime = -1;
+
+    private long windowEndTime = -1;
+
+    private final Map<SessionInfo, Queue<SessionEvent>> sessionsEvents;
+
+    private final Queue<Event> otherEvents = new LinkedList<>();
+
+    private boolean processData = false;
+
+    public HttpTimeWindowClientWorker(Emitter emitter, String name, int index) {
+        super(emitter, name, index);
+        this.sessionsEvents = new HashMap<>();
+    }
+
+    public long getWindowPeriod() {
+        return windowPeriod;
+    }
+
+    public void setWindowPeriod(long windowPeriod) {
+        Assert.greaterThanZero(windowPeriod, "tickTime");
+        this.windowPeriod = windowPeriod;
+    }
+
+    public long getWaitTimeout() {
+        return waitTimeout;
+    }
+
+    public void setWaitTimeout(long waitTimeout) {
+        Assert.greaterThanZero(waitTimeout, "eventsQueueWaitTime");
+        this.waitTimeout = waitTimeout;
+    }
+
+    @Override
+    protected void flowStateChanged(HttpFlowContext context, int oldState) {
+        synchronized (lock) {
+            SessionInfo session = context.sessionInfo();
+            switch (context.state()) {
+                case HttpFlowContext.STATE_RESP_RECEIVED: {
+                    Queue<SessionEvent> sessionEvents = getSessionEvents(session, false);
+                    if (sessionEvents != null
+                            && !sessionEvents.isEmpty()
+                            && sessionEvents.peek().getType() == HttpResponseEvent.TYPE) {
+                        sessionEvents.poll();
+                    }
+
+                    break;
+                }
+                case HttpFlowContext.STATE_DISCONNECTED: {
+                    Queue<SessionEvent> sessionEvents = getSessionEvents(session, false);
+                    if (sessionEvents != null) {
+                        sessionEvents.clear();
+                        sessionsEvents.remove(session);
+                    }
+
+                    break;
+                }
+            }
+
+            lock.notifyAll();
+        }
+    }
+
+    private void waitQuietly() {
+        try {
+            lock.wait(waitTimeout);
+        } catch (InterruptedException ignore) {
+        }
+    }
+
+    @Override
+    protected void closeAllConnections() {
+        synchronized (lock) {
+            boolean wait;
+            do {
+                wait = false;
+                for (HttpFlowContext flowContext : sessions.values()) {
+                    if (flowContext.state == HttpFlowContext.STATE_REQ_SENT) {
+                        wait = true;
+                        break;
+                    }
+                }
+
+                if (wait) {
+                    try {
+                        lock.wait(100);
+                    } catch (Exception e) {
+                    }
+                }
+            } while (wait);
+
+            super.closeAllConnections();
+            while (!sessions.isEmpty()) {
+                try {
+                    lock.wait(100);
+                } catch (Exception e) {
+                }
+            }
+        }
+    }
+
+    private Queue<SessionEvent> getSessionEvents(SessionInfo session) {
+        return getSessionEvents(session, true);
+    }
+
+    private Queue<SessionEvent> getSessionEvents(SessionInfo session, boolean create) {
+        Queue<SessionEvent> sessionEvents = sessionsEvents.get(session);
+        if (sessionEvents == null && create) {
+            sessionEvents = new ConcurrentLinkedQueue<>();
+            sessionsEvents.put(session, sessionEvents);
+        }
+
+        return sessionEvents;
+    }
+
+    private void addEvent(Event event) {
+        Event newEvent = eventInstanceForWorker(event);
+        switch (newEvent.getType()) {
+            case HttpSessionPayloadEvent.TYPE: {
+                long time = event.getTimestamp();
+                HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) newEvent;
+                SessionInfo session = payloadEvent.getSessionInfo();
+                Queue<SessionEvent> sessionEvents = getSessionEvents(session);
+                sessionEvents.add(payloadEvent);
+
+                HttpResponse resp = payloadEvent.getResponse();
+                HttpRequest req = payloadEvent.getRequest();
+                sessionEvents.add(new HttpResponseEvent(session, resp,
+                        time + (resp.getTimestamp() - req.getTimestamp())
+                ));
+
+                break;
+            }
+            case SessionStatusEvent.TYPE: {
+                SessionStatusEvent statusEvent = (SessionStatusEvent) newEvent;
+                Queue<SessionEvent> sessionEvents = getSessionEvents(statusEvent.getSessionInfo());
+                sessionEvents.add(statusEvent);
+                break;
+            }
+            case DataLoopEnd.TYPE:
+            case DataEnd.TYPE:
+                otherEvents.add(event);
+                processData = true;
+                break;
+        }
+    }
+
+    private boolean timeInWindow(long time) {
+        return (windowStartTime <= time && time < windowEndTime);
+    }
+
+    private void clearWindow() {
+        windowStartTime = -1;
+        windowEndTime = -1;
+    }
+
+    @Override
+    public void handle(Event event) {
+        synchronized (lock) {
+            while (processData) {
+                try {
+                    lock.wait(waitTimeout);
+                } catch (InterruptedException ignore) {
+
+                }
+            }
+
+            long time = event.getTimestamp();
+            if (windowEndTime == -1) {
+                int factor = (int) (time / windowPeriod);
+                windowStartTime = factor * windowPeriod;
+                windowEndTime = (factor + 1) * windowPeriod;
+                addEvent(event);
+                return;
+            }
+
+            if (time < windowStartTime) {
+                logger.debug("Event from the past.");
+                return;
+            } else if (!timeInWindow(time)) {
+                processData = true;
+            }
+
+            addEvent(event);
+        }
+    }
+
+    private boolean processSessionEvent(SessionEvent event) {
+        switch (event.getType()) {
+            case SessionStatusEvent.TYPE: {
+                SessionStatusEvent statusEvent = (SessionStatusEvent) event;
+                if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
+                    connect(statusEvent);
+                } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
+                    HttpFlowContext flowContext = flowContext(statusEvent);
+                    if (flowContext != null) {
+                        if (flowContext.state != HttpFlowContext.STATE_REQ_SENT) {
+                            close(statusEvent);
+                        }
+                    }
+                }
+
+                return true;
+            }
+            case HttpSessionPayloadEvent.TYPE: {
+                SessionEvent sessEvent = (SessionEvent) event;
+                HttpFlowContext flowContext = flowContext(sessEvent);
+                if (flowContext != null) {
+                    switch (flowContext.state) {
+                        case HttpFlowContext.STATE_CONNECTED:
+                        case HttpFlowContext.STATE_RESP_RECEIVED:
+                        case HttpFlowContext.STATE_ERROR:
+                            if (send(flowContext, (HttpSessionPayloadEvent) event)) {
+                                return true;
+                            }
+                            break;
+                        case HttpFlowContext.STATE_DISCONNECTING:
+                        case HttpFlowContext.STATE_DISCONNECTED:
+                            if (connectPartialSession) {
+                                connect(sessEvent);
+                            } else {
+                                return true;
+                            }
+                            break;
+                    }
+                } else if (connectPartialSession) {
+                    connect(sessEvent);
+                }
+
+                break;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public void run() {
+        synchronized (lock) {
+            working = true;
+            while (working) {
+                try {
+                    while (!processData) {
+                        waitQuietly();
+                    }
+
+                    for (;;) {
+                        boolean breakLoop = true;
+                        for (Map.Entry<SessionInfo, Queue<SessionEvent>> entry : sessionsEvents.entrySet()) {
+                            SessionInfo session = entry.getKey();
+                            Queue<SessionEvent> events = entry.getValue();
+                            if (!events.isEmpty()) {
+                                Event event = events.peek();
+                                if (timeInWindow(event.getTimestamp())) {
+                                    if (processSessionEvent((SessionEvent) event)) {
+                                        events.poll();
+                                    }
+
+                                    breakLoop = false;
+                                }
+                            }
+                        }
+
+                        if (breakLoop) {
+                            break;
+                        } else {
+                            waitQuietly();
+                        }
+                    }
+
+                    if (!otherEvents.isEmpty()) {
+                        Iterator<Event> it = otherEvents.iterator();
+                        while (it.hasNext()) {
+                            Event event = it.next();
+                            if (timeInWindow(event.getTimestamp())) {
+                                it.remove();
+                                switch (event.getType()) {
+                                    case DataLoopEnd.TYPE:
+                                        if (logger.isDebugEnabled()) {
+                                            logger.debug("DataLoopEnd received.");
+                                        }
+
+                                        closeAllConnections();
+                                        break;
+                                    case DataEnd.TYPE:
+                                        if (logger.isDebugEnabled()) {
+                                            logger.debug("DataEnd received. Deactivation.");
+                                        }
+
+                                        working = false;
+                                        break;
+                                }
+                            } else {
+                                break;
+                            }
+                        }
+                    }
+
+                    clearWindow();
+                    processData = false;
+                } catch (Exception e) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug(e.getMessage(), e);
+                    }
+                }
+            }
+        }
+    }
+
+    private final class HttpRequestEvent extends SessionPayloadEvent<HttpRequest> {
+
+        public static final int TYPE = 1011;
+
+        public HttpRequestEvent(SessionInfo sessionInfo, HttpRequest payload, long timestamp) {
+            super(sessionInfo, payload);
+            setTimestamp(timestamp);
+        }
+
+        @Override
+        public SessionEvent instanceForWorker(int index) {
+            throw new UnsupportedOperationException("Not supported yet.");
+        }
+
+        @Override
+        public int getType() {
+            return TYPE;
+        }
+
+    }
+
+    private final class HttpResponseEvent extends SessionPayloadEvent<HttpResponse> {
+
+        public static final int TYPE = 1012;
+
+        public HttpResponseEvent(SessionInfo sessionInfo, HttpResponse payload, long timestamp) {
+            super(sessionInfo, payload);
+            setTimestamp(timestamp);
+        }
+
+        @Override
+        public SessionEvent instanceForWorker(int index) {
+            throw new UnsupportedOperationException("Not supported yet.");
+        }
+
+        @Override
+        public int getType() {
+            return TYPE;
+        }
+
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpMvelFilter.java	Fri Sep 01 09:59:19 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpMvelFilter.java	Thu Sep 07 15:42:35 2017 +0200
@@ -94,6 +94,10 @@
         vars.put("$req", req);
         vars.put("$resp", resp);
         vars.put("$context", context);
+        if (context != null) {
+            vars.put("$scopes", context.scopes());
+            vars.put("$httpSession", context.scopes().getSession(req));
+        }
 
         CachingMapVariableResolverFactory factory = new CachingMapVariableResolverFactory(vars);
         if (globalFactory != null) {
--- a/stress-tester/src/test/java/com/passus/st/client/EventTest.java	Fri Sep 01 09:59:19 2017 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/EventTest.java	Thu Sep 07 15:42:35 2017 +0200
@@ -3,7 +3,9 @@
 import com.passus.commons.Loader;
 import com.passus.commons.Resolver;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.lang.reflect.Parameter;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
@@ -42,16 +44,11 @@
     private static Map<Class, Integer> getEventTypes(Set<Class<?>> classes) throws ReflectiveOperationException {
         Map<Class, Integer> types = new LinkedHashMap<>(classes.size());
         for (final Class<?> clazz : classes) {
-            Constructor<?> ctor;
-            try {
-                ctor = clazz.getDeclaredConstructor();
-            } catch (NoSuchMethodException no) {
-                ctor = clazz.getConstructors()[0];
+            Field field = clazz.getDeclaredField("TYPE");
+            if (field.isAccessible()) {
+                types.put(clazz, (Integer) field.get(null));
             }
-            ctor.setAccessible(true);
-            int params = ctor.getParameterCount();
-            Event e = (Event) ctor.newInstance(new Object[params]);
-            types.put(clazz, e.getType());
+
         }
         return types;
     }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/client/http/HttpTimeWindowClientWorkerTest.java	Thu Sep 07 15:42:35 2017 +0200
@@ -0,0 +1,89 @@
+package com.passus.st.client.http;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import com.passus.commons.service.ServiceUtils;
+import com.passus.st.AbstractWireMockTest;
+import com.passus.st.Log4jConfigurationFactory;
+import com.passus.st.client.Event;
+import com.passus.st.client.SessionEvent;
+import com.passus.st.client.SessionStatusEvent;
+import com.passus.st.client.TestHttpClientListener;
+import com.passus.st.emitter.RuleBasedSessionMapper;
+import com.passus.st.emitter.nio.NioEmitter;
+import com.passus.st.utils.EventUtils;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class HttpTimeWindowClientWorkerTest extends AbstractWireMockTest {
+
+    private NioEmitter prepareEmitter(String mapperRule) throws Exception {
+        RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper();
+        sessionMapper.addRule(mapperRule);
+
+        NioEmitter emitter = new NioEmitter();
+        emitter.setSessionMapper(sessionMapper);
+        return emitter;
+    }
+
+    @BeforeMethod
+    public void beforeMethod() {
+        String content = "test";
+        stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html"))
+                .willReturn(aResponse()
+                        .withHeader("Content-Type", "text/plain")
+                        .withHeader("Content-Length", "" + content.length())
+                        .withBody(content)));
+    }
+
+    @Test
+    public void testHandle() throws Exception {
+        Properties props = new Properties();
+        props.put("allowPartialSession", "true");
+        props.put("ports", "4214");
+        List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props);
+        assertEquals(4, events.size());
+
+        NioEmitter emitter = prepareEmitter("10.87.110.40:4214 ->  " + HOST + ":" + PORT);
+        emitter.start();
+
+        TestHttpClientListener listner = new TestHttpClientListener();
+
+        HttpTimeWindowClientWorker worker = new HttpTimeWindowClientWorker(emitter, "test", 0);
+        try {
+            worker.setListeners(Arrays.asList(listner));
+            worker.start();
+
+            SessionEvent sessionEvent = (SessionEvent) events.get(0);
+            SessionStatusEvent statusEvent = new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED);
+            statusEvent.setTimestamp(sessionEvent.getTimestamp());
+            worker.handle(statusEvent);
+
+            events.forEach((event) -> {
+                worker.handle(event);
+            });
+
+            worker.join();
+            assertTrue(listner.size() > 0);
+            assertTrue(listner.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent);
+            TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listner.get(0);
+            String responseStr = event.getResponse().toString();
+            assertTrue(responseStr.startsWith("HTTP/1.1 200 OK"));
+            assertTrue(responseStr.endsWith("test"));
+        } finally {
+            ServiceUtils.stopQuietly(emitter);
+        }
+    }
+
+}
--- a/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java	Fri Sep 01 09:59:19 2017 +0200
+++ b/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java	Thu Sep 07 15:42:35 2017 +0200
@@ -33,7 +33,7 @@
         src.stop();
 
         assertTrue(handler.get(handler.size() - 3) instanceof SessionStatusEvent);
-        assertTrue(handler.findFirst(HttpSessionPayloadEvent.HTTP_SESSION_PAYLOAD) instanceof HttpSessionPayloadEvent);
+        assertTrue(handler.findFirst(HttpSessionPayloadEvent.TYPE) instanceof HttpSessionPayloadEvent);
         assertTrue(handler.get(handler.size() - 2) instanceof DataLoopEnd);
         assertTrue(handler.get(handler.size() - 1) instanceof DataEnd);
     }