changeset 533:1822bfe145c6

HttpTimeWindowClientWorker -> HttpAsynchClinetWorker
author Devel 2
date Fri, 08 Sep 2017 08:47:02 +0200
parents f78f71168bef
children 91d608d00bca
files stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpTimeWindowClientWorker.java stress-tester/src/test/java/com/passus/st/client/http/HttpAsynchClientWorkerTest.java stress-tester/src/test/java/com/passus/st/client/http/HttpTimeWindowClientWorkerTest.java
diffstat 4 files changed, 476 insertions(+), 483 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java	Fri Sep 08 08:47:02 2017 +0200
@@ -0,0 +1,387 @@
+package com.passus.st.client.http;
+
+import com.passus.commons.Assert;
+import com.passus.commons.annotations.Plugin;
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpResponse;
+import com.passus.st.client.DataEvents.DataEnd;
+import com.passus.st.client.DataEvents.DataLoopEnd;
+import com.passus.st.client.Event;
+import com.passus.st.client.SessionEvent;
+import com.passus.st.client.SessionStatusEvent;
+import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.plugin.PluginConstants;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import com.passus.st.client.SessionPayloadEvent;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+@Plugin(name = HttpAsynchClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER)
+public class HttpAsynchClientWorker extends HttpFlowBasedClientWorker {
+
+    public static final String TYPE = "asynch";
+
+    private long waitTimeout = 100;
+
+    private long windowPeriod = 10;
+
+    private long windowStartTime = -1;
+
+    private long windowEndTime = -1;
+
+    private final Map<SessionInfo, Queue<SessionEvent>> sessionsEvents;
+
+    private final Queue<Event> otherEvents = new LinkedList<>();
+
+    private boolean processData = false;
+
+    public HttpAsynchClientWorker(Emitter emitter, String name, int index) {
+        super(emitter, name, index);
+        this.sessionsEvents = new HashMap<>();
+    }
+
+    public long getWindowPeriod() {
+        return windowPeriod;
+    }
+
+    public void setWindowPeriod(long windowPeriod) {
+        Assert.greaterThanZero(windowPeriod, "tickTime");
+        this.windowPeriod = windowPeriod;
+    }
+
+    public long getWaitTimeout() {
+        return waitTimeout;
+    }
+
+    public void setWaitTimeout(long waitTimeout) {
+        Assert.greaterThanZero(waitTimeout, "eventsQueueWaitTime");
+        this.waitTimeout = waitTimeout;
+    }
+
+    @Override
+    protected void flowStateChanged(HttpFlowContext context, int oldState) {
+        synchronized (lock) {
+            SessionInfo session = context.sessionInfo();
+            switch (context.state()) {
+                case HttpFlowContext.STATE_RESP_RECEIVED: {
+                    Queue<SessionEvent> sessionEvents = getSessionEvents(session, false);
+                    if (sessionEvents != null
+                            && !sessionEvents.isEmpty()
+                            && sessionEvents.peek().getType() == HttpResponseEvent.TYPE) {
+                        sessionEvents.poll();
+                    }
+
+                    break;
+                }
+                case HttpFlowContext.STATE_DISCONNECTED: {
+                    Queue<SessionEvent> sessionEvents = getSessionEvents(session, false);
+                    if (sessionEvents != null) {
+                        sessionEvents.clear();
+                        sessionsEvents.remove(session);
+                    }
+
+                    break;
+                }
+            }
+
+            lock.notifyAll();
+        }
+    }
+
+    private void waitQuietly() {
+        try {
+            lock.wait(waitTimeout);
+        } catch (InterruptedException ignore) {
+        }
+    }
+
+    @Override
+    protected void closeAllConnections() {
+        synchronized (lock) {
+            boolean wait;
+            do {
+                wait = false;
+                for (HttpFlowContext flowContext : sessions.values()) {
+                    if (flowContext.state == HttpFlowContext.STATE_REQ_SENT) {
+                        wait = true;
+                        break;
+                    }
+                }
+
+                if (wait) {
+                    try {
+                        lock.wait(100);
+                    } catch (Exception e) {
+                    }
+                }
+            } while (wait);
+
+            super.closeAllConnections();
+            while (!sessions.isEmpty()) {
+                try {
+                    lock.wait(100);
+                } catch (Exception e) {
+                }
+            }
+        }
+    }
+
+    private Queue<SessionEvent> getSessionEvents(SessionInfo session) {
+        return getSessionEvents(session, true);
+    }
+
+    private Queue<SessionEvent> getSessionEvents(SessionInfo session, boolean create) {
+        Queue<SessionEvent> sessionEvents = sessionsEvents.get(session);
+        if (sessionEvents == null && create) {
+            sessionEvents = new ConcurrentLinkedQueue<>();
+            sessionsEvents.put(session, sessionEvents);
+        }
+
+        return sessionEvents;
+    }
+
+    private void addEvent(Event event) {
+        Event newEvent = eventInstanceForWorker(event);
+        switch (newEvent.getType()) {
+            case HttpSessionPayloadEvent.TYPE: {
+                long time = event.getTimestamp();
+                HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) newEvent;
+                SessionInfo session = payloadEvent.getSessionInfo();
+                Queue<SessionEvent> sessionEvents = getSessionEvents(session);
+                sessionEvents.add(payloadEvent);
+
+                HttpResponse resp = payloadEvent.getResponse();
+                HttpRequest req = payloadEvent.getRequest();
+                sessionEvents.add(new HttpResponseEvent(session, resp,
+                        time + (resp.getTimestamp() - req.getTimestamp())
+                ));
+
+                break;
+            }
+            case SessionStatusEvent.TYPE: {
+                SessionStatusEvent statusEvent = (SessionStatusEvent) newEvent;
+                Queue<SessionEvent> sessionEvents = getSessionEvents(statusEvent.getSessionInfo());
+                sessionEvents.add(statusEvent);
+                break;
+            }
+            case DataLoopEnd.TYPE:
+            case DataEnd.TYPE:
+                otherEvents.add(event);
+                processData = true;
+                break;
+        }
+    }
+
+    private boolean timeInWindow(long time) {
+        return (windowStartTime <= time && time < windowEndTime);
+    }
+
+    private void clearWindow() {
+        windowStartTime = -1;
+        windowEndTime = -1;
+    }
+
+    @Override
+    public void handle(Event event) {
+        synchronized (lock) {
+            while (processData) {
+                try {
+                    lock.wait(waitTimeout);
+                } catch (InterruptedException ignore) {
+
+                }
+            }
+
+            long time = event.getTimestamp();
+            if (windowEndTime == -1) {
+                int factor = (int) (time / windowPeriod);
+                windowStartTime = factor * windowPeriod;
+                windowEndTime = (factor + 1) * windowPeriod;
+                addEvent(event);
+                return;
+            }
+
+            if (time < windowStartTime) {
+                logger.debug("Event from the past.");
+                return;
+            } else if (!timeInWindow(time)) {
+                processData = true;
+            }
+
+            addEvent(event);
+        }
+    }
+
+    private boolean processSessionEvent(SessionEvent event) {
+        switch (event.getType()) {
+            case SessionStatusEvent.TYPE: {
+                SessionStatusEvent statusEvent = (SessionStatusEvent) event;
+                if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
+                    connect(statusEvent);
+                } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
+                    HttpFlowContext flowContext = flowContext(statusEvent);
+                    if (flowContext != null) {
+                        if (flowContext.state != HttpFlowContext.STATE_REQ_SENT) {
+                            close(statusEvent);
+                        }
+                    }
+                }
+
+                return true;
+            }
+            case HttpSessionPayloadEvent.TYPE: {
+                SessionEvent sessEvent = (SessionEvent) event;
+                HttpFlowContext flowContext = flowContext(sessEvent);
+                if (flowContext != null) {
+                    switch (flowContext.state) {
+                        case HttpFlowContext.STATE_CONNECTED:
+                        case HttpFlowContext.STATE_RESP_RECEIVED:
+                        case HttpFlowContext.STATE_ERROR:
+                            if (send(flowContext, (HttpSessionPayloadEvent) event)) {
+                                return true;
+                            }
+                            break;
+                        case HttpFlowContext.STATE_DISCONNECTING:
+                        case HttpFlowContext.STATE_DISCONNECTED:
+                            if (connectPartialSession) {
+                                connect(sessEvent);
+                            } else {
+                                return true;
+                            }
+                            break;
+                    }
+                } else if (connectPartialSession) {
+                    connect(sessEvent);
+                }
+
+                break;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public void run() {
+        synchronized (lock) {
+            working = true;
+            while (working) {
+                try {
+                    while (!processData) {
+                        waitQuietly();
+                    }
+
+                    for (;;) {
+                        boolean breakLoop = true;
+                        for (Map.Entry<SessionInfo, Queue<SessionEvent>> entry : sessionsEvents.entrySet()) {
+                            SessionInfo session = entry.getKey();
+                            Queue<SessionEvent> events = entry.getValue();
+                            if (!events.isEmpty()) {
+                                Event event = events.peek();
+                                if (timeInWindow(event.getTimestamp())) {
+                                    if (processSessionEvent((SessionEvent) event)) {
+                                        events.poll();
+                                    }
+
+                                    breakLoop = false;
+                                }
+                            }
+                        }
+
+                        if (breakLoop) {
+                            break;
+                        } else {
+                            waitQuietly();
+                        }
+                    }
+
+                    if (!otherEvents.isEmpty()) {
+                        Iterator<Event> it = otherEvents.iterator();
+                        while (it.hasNext()) {
+                            Event event = it.next();
+                            if (timeInWindow(event.getTimestamp())) {
+                                it.remove();
+                                switch (event.getType()) {
+                                    case DataLoopEnd.TYPE:
+                                        if (logger.isDebugEnabled()) {
+                                            logger.debug("DataLoopEnd received.");
+                                        }
+
+                                        closeAllConnections();
+                                        break;
+                                    case DataEnd.TYPE:
+                                        if (logger.isDebugEnabled()) {
+                                            logger.debug("DataEnd received. Deactivation.");
+                                        }
+
+                                        working = false;
+                                        break;
+                                }
+                            } else {
+                                break;
+                            }
+                        }
+                    }
+
+                    clearWindow();
+                    processData = false;
+                } catch (Exception e) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug(e.getMessage(), e);
+                    }
+                }
+            }
+        }
+    }
+
+    private final class HttpRequestEvent extends SessionPayloadEvent<HttpRequest> {
+
+        public static final int TYPE = 1011;
+
+        public HttpRequestEvent(SessionInfo sessionInfo, HttpRequest payload, long timestamp) {
+            super(sessionInfo, payload);
+            setTimestamp(timestamp);
+        }
+
+        @Override
+        public SessionEvent instanceForWorker(int index) {
+            throw new UnsupportedOperationException("Not supported yet.");
+        }
+
+        @Override
+        public int getType() {
+            return TYPE;
+        }
+
+    }
+
+    private final class HttpResponseEvent extends SessionPayloadEvent<HttpResponse> {
+
+        public static final int TYPE = 1012;
+
+        public HttpResponseEvent(SessionInfo sessionInfo, HttpResponse payload, long timestamp) {
+            super(sessionInfo, payload);
+            setTimestamp(timestamp);
+        }
+
+        @Override
+        public SessionEvent instanceForWorker(int index) {
+            throw new UnsupportedOperationException("Not supported yet.");
+        }
+
+        @Override
+        public int getType() {
+            return TYPE;
+        }
+
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpTimeWindowClientWorker.java	Thu Sep 07 15:42:35 2017 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,394 +0,0 @@
-package com.passus.st.client.http;
-
-import com.passus.commons.Assert;
-import com.passus.commons.annotations.Plugin;
-import com.passus.commons.collection.MixedLinkedList;
-import com.passus.net.http.HttpRequest;
-import com.passus.net.http.HttpResponse;
-import com.passus.st.client.DataEvents.DataEnd;
-import com.passus.st.client.DataEvents.DataLoopEnd;
-import com.passus.st.client.Event;
-import com.passus.st.client.SessionEvent;
-import com.passus.st.client.SessionStatusEvent;
-import com.passus.st.emitter.Emitter;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.plugin.PluginConstants;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.TreeSet;
-import com.passus.st.client.SessionPayloadEvent;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- *
- * @author Mirosław Hawrot
- */
-@Plugin(name = HttpTimeWindowClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER)
-public class HttpTimeWindowClientWorker extends HttpFlowBasedClientWorker {
-
-    public static final String TYPE = "timeWindow";
-
-    private long waitTimeout = 100;
-
-    private long windowPeriod = 10;
-
-    private long windowStartTime = -1;
-
-    private long windowEndTime = -1;
-
-    private final Map<SessionInfo, Queue<SessionEvent>> sessionsEvents;
-
-    private final Queue<Event> otherEvents = new LinkedList<>();
-
-    private boolean processData = false;
-
-    public HttpTimeWindowClientWorker(Emitter emitter, String name, int index) {
-        super(emitter, name, index);
-        this.sessionsEvents = new HashMap<>();
-    }
-
-    public long getWindowPeriod() {
-        return windowPeriod;
-    }
-
-    public void setWindowPeriod(long windowPeriod) {
-        Assert.greaterThanZero(windowPeriod, "tickTime");
-        this.windowPeriod = windowPeriod;
-    }
-
-    public long getWaitTimeout() {
-        return waitTimeout;
-    }
-
-    public void setWaitTimeout(long waitTimeout) {
-        Assert.greaterThanZero(waitTimeout, "eventsQueueWaitTime");
-        this.waitTimeout = waitTimeout;
-    }
-
-    @Override
-    protected void flowStateChanged(HttpFlowContext context, int oldState) {
-        synchronized (lock) {
-            SessionInfo session = context.sessionInfo();
-            switch (context.state()) {
-                case HttpFlowContext.STATE_RESP_RECEIVED: {
-                    Queue<SessionEvent> sessionEvents = getSessionEvents(session, false);
-                    if (sessionEvents != null
-                            && !sessionEvents.isEmpty()
-                            && sessionEvents.peek().getType() == HttpResponseEvent.TYPE) {
-                        sessionEvents.poll();
-                    }
-
-                    break;
-                }
-                case HttpFlowContext.STATE_DISCONNECTED: {
-                    Queue<SessionEvent> sessionEvents = getSessionEvents(session, false);
-                    if (sessionEvents != null) {
-                        sessionEvents.clear();
-                        sessionsEvents.remove(session);
-                    }
-
-                    break;
-                }
-            }
-
-            lock.notifyAll();
-        }
-    }
-
-    private void waitQuietly() {
-        try {
-            lock.wait(waitTimeout);
-        } catch (InterruptedException ignore) {
-        }
-    }
-
-    @Override
-    protected void closeAllConnections() {
-        synchronized (lock) {
-            boolean wait;
-            do {
-                wait = false;
-                for (HttpFlowContext flowContext : sessions.values()) {
-                    if (flowContext.state == HttpFlowContext.STATE_REQ_SENT) {
-                        wait = true;
-                        break;
-                    }
-                }
-
-                if (wait) {
-                    try {
-                        lock.wait(100);
-                    } catch (Exception e) {
-                    }
-                }
-            } while (wait);
-
-            super.closeAllConnections();
-            while (!sessions.isEmpty()) {
-                try {
-                    lock.wait(100);
-                } catch (Exception e) {
-                }
-            }
-        }
-    }
-
-    private Queue<SessionEvent> getSessionEvents(SessionInfo session) {
-        return getSessionEvents(session, true);
-    }
-
-    private Queue<SessionEvent> getSessionEvents(SessionInfo session, boolean create) {
-        Queue<SessionEvent> sessionEvents = sessionsEvents.get(session);
-        if (sessionEvents == null && create) {
-            sessionEvents = new ConcurrentLinkedQueue<>();
-            sessionsEvents.put(session, sessionEvents);
-        }
-
-        return sessionEvents;
-    }
-
-    private void addEvent(Event event) {
-        Event newEvent = eventInstanceForWorker(event);
-        switch (newEvent.getType()) {
-            case HttpSessionPayloadEvent.TYPE: {
-                long time = event.getTimestamp();
-                HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) newEvent;
-                SessionInfo session = payloadEvent.getSessionInfo();
-                Queue<SessionEvent> sessionEvents = getSessionEvents(session);
-                sessionEvents.add(payloadEvent);
-
-                HttpResponse resp = payloadEvent.getResponse();
-                HttpRequest req = payloadEvent.getRequest();
-                sessionEvents.add(new HttpResponseEvent(session, resp,
-                        time + (resp.getTimestamp() - req.getTimestamp())
-                ));
-
-                break;
-            }
-            case SessionStatusEvent.TYPE: {
-                SessionStatusEvent statusEvent = (SessionStatusEvent) newEvent;
-                Queue<SessionEvent> sessionEvents = getSessionEvents(statusEvent.getSessionInfo());
-                sessionEvents.add(statusEvent);
-                break;
-            }
-            case DataLoopEnd.TYPE:
-            case DataEnd.TYPE:
-                otherEvents.add(event);
-                processData = true;
-                break;
-        }
-    }
-
-    private boolean timeInWindow(long time) {
-        return (windowStartTime <= time && time < windowEndTime);
-    }
-
-    private void clearWindow() {
-        windowStartTime = -1;
-        windowEndTime = -1;
-    }
-
-    @Override
-    public void handle(Event event) {
-        synchronized (lock) {
-            while (processData) {
-                try {
-                    lock.wait(waitTimeout);
-                } catch (InterruptedException ignore) {
-
-                }
-            }
-
-            long time = event.getTimestamp();
-            if (windowEndTime == -1) {
-                int factor = (int) (time / windowPeriod);
-                windowStartTime = factor * windowPeriod;
-                windowEndTime = (factor + 1) * windowPeriod;
-                addEvent(event);
-                return;
-            }
-
-            if (time < windowStartTime) {
-                logger.debug("Event from the past.");
-                return;
-            } else if (!timeInWindow(time)) {
-                processData = true;
-            }
-
-            addEvent(event);
-        }
-    }
-
-    private boolean processSessionEvent(SessionEvent event) {
-        switch (event.getType()) {
-            case SessionStatusEvent.TYPE: {
-                SessionStatusEvent statusEvent = (SessionStatusEvent) event;
-                if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
-                    connect(statusEvent);
-                } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
-                    HttpFlowContext flowContext = flowContext(statusEvent);
-                    if (flowContext != null) {
-                        if (flowContext.state != HttpFlowContext.STATE_REQ_SENT) {
-                            close(statusEvent);
-                        }
-                    }
-                }
-
-                return true;
-            }
-            case HttpSessionPayloadEvent.TYPE: {
-                SessionEvent sessEvent = (SessionEvent) event;
-                HttpFlowContext flowContext = flowContext(sessEvent);
-                if (flowContext != null) {
-                    switch (flowContext.state) {
-                        case HttpFlowContext.STATE_CONNECTED:
-                        case HttpFlowContext.STATE_RESP_RECEIVED:
-                        case HttpFlowContext.STATE_ERROR:
-                            if (send(flowContext, (HttpSessionPayloadEvent) event)) {
-                                return true;
-                            }
-                            break;
-                        case HttpFlowContext.STATE_DISCONNECTING:
-                        case HttpFlowContext.STATE_DISCONNECTED:
-                            if (connectPartialSession) {
-                                connect(sessEvent);
-                            } else {
-                                return true;
-                            }
-                            break;
-                    }
-                } else if (connectPartialSession) {
-                    connect(sessEvent);
-                }
-
-                break;
-            }
-        }
-
-        return false;
-    }
-
-    @Override
-    public void run() {
-        synchronized (lock) {
-            working = true;
-            while (working) {
-                try {
-                    while (!processData) {
-                        waitQuietly();
-                    }
-
-                    for (;;) {
-                        boolean breakLoop = true;
-                        for (Map.Entry<SessionInfo, Queue<SessionEvent>> entry : sessionsEvents.entrySet()) {
-                            SessionInfo session = entry.getKey();
-                            Queue<SessionEvent> events = entry.getValue();
-                            if (!events.isEmpty()) {
-                                Event event = events.peek();
-                                if (timeInWindow(event.getTimestamp())) {
-                                    if (processSessionEvent((SessionEvent) event)) {
-                                        events.poll();
-                                    }
-
-                                    breakLoop = false;
-                                }
-                            }
-                        }
-
-                        if (breakLoop) {
-                            break;
-                        } else {
-                            waitQuietly();
-                        }
-                    }
-
-                    if (!otherEvents.isEmpty()) {
-                        Iterator<Event> it = otherEvents.iterator();
-                        while (it.hasNext()) {
-                            Event event = it.next();
-                            if (timeInWindow(event.getTimestamp())) {
-                                it.remove();
-                                switch (event.getType()) {
-                                    case DataLoopEnd.TYPE:
-                                        if (logger.isDebugEnabled()) {
-                                            logger.debug("DataLoopEnd received.");
-                                        }
-
-                                        closeAllConnections();
-                                        break;
-                                    case DataEnd.TYPE:
-                                        if (logger.isDebugEnabled()) {
-                                            logger.debug("DataEnd received. Deactivation.");
-                                        }
-
-                                        working = false;
-                                        break;
-                                }
-                            } else {
-                                break;
-                            }
-                        }
-                    }
-
-                    clearWindow();
-                    processData = false;
-                } catch (Exception e) {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug(e.getMessage(), e);
-                    }
-                }
-            }
-        }
-    }
-
-    private final class HttpRequestEvent extends SessionPayloadEvent<HttpRequest> {
-
-        public static final int TYPE = 1011;
-
-        public HttpRequestEvent(SessionInfo sessionInfo, HttpRequest payload, long timestamp) {
-            super(sessionInfo, payload);
-            setTimestamp(timestamp);
-        }
-
-        @Override
-        public SessionEvent instanceForWorker(int index) {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-
-        @Override
-        public int getType() {
-            return TYPE;
-        }
-
-    }
-
-    private final class HttpResponseEvent extends SessionPayloadEvent<HttpResponse> {
-
-        public static final int TYPE = 1012;
-
-        public HttpResponseEvent(SessionInfo sessionInfo, HttpResponse payload, long timestamp) {
-            super(sessionInfo, payload);
-            setTimestamp(timestamp);
-        }
-
-        @Override
-        public SessionEvent instanceForWorker(int index) {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-
-        @Override
-        public int getType() {
-            return TYPE;
-        }
-
-    }
-
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/client/http/HttpAsynchClientWorkerTest.java	Fri Sep 08 08:47:02 2017 +0200
@@ -0,0 +1,89 @@
+package com.passus.st.client.http;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import com.passus.commons.service.ServiceUtils;
+import com.passus.st.AbstractWireMockTest;
+import com.passus.st.Log4jConfigurationFactory;
+import com.passus.st.client.Event;
+import com.passus.st.client.SessionEvent;
+import com.passus.st.client.SessionStatusEvent;
+import com.passus.st.client.TestHttpClientListener;
+import com.passus.st.emitter.RuleBasedSessionMapper;
+import com.passus.st.emitter.nio.NioEmitter;
+import com.passus.st.utils.EventUtils;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class HttpAsynchClientWorkerTest extends AbstractWireMockTest {
+
+    private NioEmitter prepareEmitter(String mapperRule) throws Exception {
+        RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper();
+        sessionMapper.addRule(mapperRule);
+
+        NioEmitter emitter = new NioEmitter();
+        emitter.setSessionMapper(sessionMapper);
+        return emitter;
+    }
+
+    @BeforeMethod
+    public void beforeMethod() {
+        String content = "test";
+        stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html"))
+                .willReturn(aResponse()
+                        .withHeader("Content-Type", "text/plain")
+                        .withHeader("Content-Length", "" + content.length())
+                        .withBody(content)));
+    }
+
+    @Test
+    public void testHandle() throws Exception {
+        Properties props = new Properties();
+        props.put("allowPartialSession", "true");
+        props.put("ports", "4214");
+        List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props);
+        assertEquals(4, events.size());
+
+        NioEmitter emitter = prepareEmitter("10.87.110.40:4214 ->  " + HOST + ":" + PORT);
+        emitter.start();
+
+        TestHttpClientListener listner = new TestHttpClientListener();
+
+        HttpAsynchClientWorker worker = new HttpAsynchClientWorker(emitter, "test", 0);
+        try {
+            worker.setListeners(Arrays.asList(listner));
+            worker.start();
+
+            SessionEvent sessionEvent = (SessionEvent) events.get(0);
+            SessionStatusEvent statusEvent = new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED);
+            statusEvent.setTimestamp(sessionEvent.getTimestamp());
+            worker.handle(statusEvent);
+
+            events.forEach((event) -> {
+                worker.handle(event);
+            });
+
+            worker.join();
+            assertTrue(listner.size() > 0);
+            assertTrue(listner.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent);
+            TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listner.get(0);
+            String responseStr = event.getResponse().toString();
+            assertTrue(responseStr.startsWith("HTTP/1.1 200 OK"));
+            assertTrue(responseStr.endsWith("test"));
+        } finally {
+            ServiceUtils.stopQuietly(emitter);
+        }
+    }
+
+}
--- a/stress-tester/src/test/java/com/passus/st/client/http/HttpTimeWindowClientWorkerTest.java	Thu Sep 07 15:42:35 2017 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,89 +0,0 @@
-package com.passus.st.client.http;
-
-import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
-import static com.github.tomakehurst.wiremock.client.WireMock.post;
-import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
-import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
-import com.passus.commons.service.ServiceUtils;
-import com.passus.st.AbstractWireMockTest;
-import com.passus.st.Log4jConfigurationFactory;
-import com.passus.st.client.Event;
-import com.passus.st.client.SessionEvent;
-import com.passus.st.client.SessionStatusEvent;
-import com.passus.st.client.TestHttpClientListener;
-import com.passus.st.emitter.RuleBasedSessionMapper;
-import com.passus.st.emitter.nio.NioEmitter;
-import com.passus.st.utils.EventUtils;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertTrue;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- *
- * @author Mirosław Hawrot
- */
-public class HttpTimeWindowClientWorkerTest extends AbstractWireMockTest {
-
-    private NioEmitter prepareEmitter(String mapperRule) throws Exception {
-        RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper();
-        sessionMapper.addRule(mapperRule);
-
-        NioEmitter emitter = new NioEmitter();
-        emitter.setSessionMapper(sessionMapper);
-        return emitter;
-    }
-
-    @BeforeMethod
-    public void beforeMethod() {
-        String content = "test";
-        stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html"))
-                .willReturn(aResponse()
-                        .withHeader("Content-Type", "text/plain")
-                        .withHeader("Content-Length", "" + content.length())
-                        .withBody(content)));
-    }
-
-    @Test
-    public void testHandle() throws Exception {
-        Properties props = new Properties();
-        props.put("allowPartialSession", "true");
-        props.put("ports", "4214");
-        List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props);
-        assertEquals(4, events.size());
-
-        NioEmitter emitter = prepareEmitter("10.87.110.40:4214 ->  " + HOST + ":" + PORT);
-        emitter.start();
-
-        TestHttpClientListener listner = new TestHttpClientListener();
-
-        HttpTimeWindowClientWorker worker = new HttpTimeWindowClientWorker(emitter, "test", 0);
-        try {
-            worker.setListeners(Arrays.asList(listner));
-            worker.start();
-
-            SessionEvent sessionEvent = (SessionEvent) events.get(0);
-            SessionStatusEvent statusEvent = new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED);
-            statusEvent.setTimestamp(sessionEvent.getTimestamp());
-            worker.handle(statusEvent);
-
-            events.forEach((event) -> {
-                worker.handle(event);
-            });
-
-            worker.join();
-            assertTrue(listner.size() > 0);
-            assertTrue(listner.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent);
-            TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listner.get(0);
-            String responseStr = event.getResponse().toString();
-            assertTrue(responseStr.startsWith("HTTP/1.1 200 OK"));
-            assertTrue(responseStr.endsWith("test"));
-        } finally {
-            ServiceUtils.stopQuietly(emitter);
-        }
-    }
-
-}