Mercurial > stress-tester
changeset 533:1822bfe145c6
HttpTimeWindowClientWorker -> HttpAsynchClinetWorker
author | Devel 2 |
---|---|
date | Fri, 08 Sep 2017 08:47:02 +0200 |
parents | f78f71168bef |
children | 91d608d00bca |
files | stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpTimeWindowClientWorker.java stress-tester/src/test/java/com/passus/st/client/http/HttpAsynchClientWorkerTest.java stress-tester/src/test/java/com/passus/st/client/http/HttpTimeWindowClientWorkerTest.java |
diffstat | 4 files changed, 476 insertions(+), 483 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java Fri Sep 08 08:47:02 2017 +0200 @@ -0,0 +1,387 @@ +package com.passus.st.client.http; + +import com.passus.commons.Assert; +import com.passus.commons.annotations.Plugin; +import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpResponse; +import com.passus.st.client.DataEvents.DataEnd; +import com.passus.st.client.DataEvents.DataLoopEnd; +import com.passus.st.client.Event; +import com.passus.st.client.SessionEvent; +import com.passus.st.client.SessionStatusEvent; +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.plugin.PluginConstants; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; +import com.passus.st.client.SessionPayloadEvent; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * + * @author Mirosław Hawrot + */ +@Plugin(name = HttpAsynchClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER) +public class HttpAsynchClientWorker extends HttpFlowBasedClientWorker { + + public static final String TYPE = "asynch"; + + private long waitTimeout = 100; + + private long windowPeriod = 10; + + private long windowStartTime = -1; + + private long windowEndTime = -1; + + private final Map<SessionInfo, Queue<SessionEvent>> sessionsEvents; + + private final Queue<Event> otherEvents = new LinkedList<>(); + + private boolean processData = false; + + public HttpAsynchClientWorker(Emitter emitter, String name, int index) { + super(emitter, name, index); + this.sessionsEvents = new HashMap<>(); + } + + public long getWindowPeriod() { + return windowPeriod; + } + + public void setWindowPeriod(long windowPeriod) { + Assert.greaterThanZero(windowPeriod, "tickTime"); + this.windowPeriod = windowPeriod; + } + + public long getWaitTimeout() { + return waitTimeout; + } + + public void setWaitTimeout(long waitTimeout) { + Assert.greaterThanZero(waitTimeout, "eventsQueueWaitTime"); + this.waitTimeout = waitTimeout; + } + + @Override + protected void flowStateChanged(HttpFlowContext context, int oldState) { + synchronized (lock) { + SessionInfo session = context.sessionInfo(); + switch (context.state()) { + case HttpFlowContext.STATE_RESP_RECEIVED: { + Queue<SessionEvent> sessionEvents = getSessionEvents(session, false); + if (sessionEvents != null + && !sessionEvents.isEmpty() + && sessionEvents.peek().getType() == HttpResponseEvent.TYPE) { + sessionEvents.poll(); + } + + break; + } + case HttpFlowContext.STATE_DISCONNECTED: { + Queue<SessionEvent> sessionEvents = getSessionEvents(session, false); + if (sessionEvents != null) { + sessionEvents.clear(); + sessionsEvents.remove(session); + } + + break; + } + } + + lock.notifyAll(); + } + } + + private void waitQuietly() { + try { + lock.wait(waitTimeout); + } catch (InterruptedException ignore) { + } + } + + @Override + protected void closeAllConnections() { + synchronized (lock) { + boolean wait; + do { + wait = false; + for (HttpFlowContext flowContext : sessions.values()) { + if (flowContext.state == HttpFlowContext.STATE_REQ_SENT) { + wait = true; + break; + } + } + + if (wait) { + try { + lock.wait(100); + } catch (Exception e) { + } + } + } while (wait); + + super.closeAllConnections(); + while (!sessions.isEmpty()) { + try { + lock.wait(100); + } catch (Exception e) { + } + } + } + } + + private Queue<SessionEvent> getSessionEvents(SessionInfo session) { + return getSessionEvents(session, true); + } + + private Queue<SessionEvent> getSessionEvents(SessionInfo session, boolean create) { + Queue<SessionEvent> sessionEvents = sessionsEvents.get(session); + if (sessionEvents == null && create) { + sessionEvents = new ConcurrentLinkedQueue<>(); + sessionsEvents.put(session, sessionEvents); + } + + return sessionEvents; + } + + private void addEvent(Event event) { + Event newEvent = eventInstanceForWorker(event); + switch (newEvent.getType()) { + case HttpSessionPayloadEvent.TYPE: { + long time = event.getTimestamp(); + HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) newEvent; + SessionInfo session = payloadEvent.getSessionInfo(); + Queue<SessionEvent> sessionEvents = getSessionEvents(session); + sessionEvents.add(payloadEvent); + + HttpResponse resp = payloadEvent.getResponse(); + HttpRequest req = payloadEvent.getRequest(); + sessionEvents.add(new HttpResponseEvent(session, resp, + time + (resp.getTimestamp() - req.getTimestamp()) + )); + + break; + } + case SessionStatusEvent.TYPE: { + SessionStatusEvent statusEvent = (SessionStatusEvent) newEvent; + Queue<SessionEvent> sessionEvents = getSessionEvents(statusEvent.getSessionInfo()); + sessionEvents.add(statusEvent); + break; + } + case DataLoopEnd.TYPE: + case DataEnd.TYPE: + otherEvents.add(event); + processData = true; + break; + } + } + + private boolean timeInWindow(long time) { + return (windowStartTime <= time && time < windowEndTime); + } + + private void clearWindow() { + windowStartTime = -1; + windowEndTime = -1; + } + + @Override + public void handle(Event event) { + synchronized (lock) { + while (processData) { + try { + lock.wait(waitTimeout); + } catch (InterruptedException ignore) { + + } + } + + long time = event.getTimestamp(); + if (windowEndTime == -1) { + int factor = (int) (time / windowPeriod); + windowStartTime = factor * windowPeriod; + windowEndTime = (factor + 1) * windowPeriod; + addEvent(event); + return; + } + + if (time < windowStartTime) { + logger.debug("Event from the past."); + return; + } else if (!timeInWindow(time)) { + processData = true; + } + + addEvent(event); + } + } + + private boolean processSessionEvent(SessionEvent event) { + switch (event.getType()) { + case SessionStatusEvent.TYPE: { + SessionStatusEvent statusEvent = (SessionStatusEvent) event; + if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { + connect(statusEvent); + } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { + HttpFlowContext flowContext = flowContext(statusEvent); + if (flowContext != null) { + if (flowContext.state != HttpFlowContext.STATE_REQ_SENT) { + close(statusEvent); + } + } + } + + return true; + } + case HttpSessionPayloadEvent.TYPE: { + SessionEvent sessEvent = (SessionEvent) event; + HttpFlowContext flowContext = flowContext(sessEvent); + if (flowContext != null) { + switch (flowContext.state) { + case HttpFlowContext.STATE_CONNECTED: + case HttpFlowContext.STATE_RESP_RECEIVED: + case HttpFlowContext.STATE_ERROR: + if (send(flowContext, (HttpSessionPayloadEvent) event)) { + return true; + } + break; + case HttpFlowContext.STATE_DISCONNECTING: + case HttpFlowContext.STATE_DISCONNECTED: + if (connectPartialSession) { + connect(sessEvent); + } else { + return true; + } + break; + } + } else if (connectPartialSession) { + connect(sessEvent); + } + + break; + } + } + + return false; + } + + @Override + public void run() { + synchronized (lock) { + working = true; + while (working) { + try { + while (!processData) { + waitQuietly(); + } + + for (;;) { + boolean breakLoop = true; + for (Map.Entry<SessionInfo, Queue<SessionEvent>> entry : sessionsEvents.entrySet()) { + SessionInfo session = entry.getKey(); + Queue<SessionEvent> events = entry.getValue(); + if (!events.isEmpty()) { + Event event = events.peek(); + if (timeInWindow(event.getTimestamp())) { + if (processSessionEvent((SessionEvent) event)) { + events.poll(); + } + + breakLoop = false; + } + } + } + + if (breakLoop) { + break; + } else { + waitQuietly(); + } + } + + if (!otherEvents.isEmpty()) { + Iterator<Event> it = otherEvents.iterator(); + while (it.hasNext()) { + Event event = it.next(); + if (timeInWindow(event.getTimestamp())) { + it.remove(); + switch (event.getType()) { + case DataLoopEnd.TYPE: + if (logger.isDebugEnabled()) { + logger.debug("DataLoopEnd received."); + } + + closeAllConnections(); + break; + case DataEnd.TYPE: + if (logger.isDebugEnabled()) { + logger.debug("DataEnd received. Deactivation."); + } + + working = false; + break; + } + } else { + break; + } + } + } + + clearWindow(); + processData = false; + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } + } + } + } + + private final class HttpRequestEvent extends SessionPayloadEvent<HttpRequest> { + + public static final int TYPE = 1011; + + public HttpRequestEvent(SessionInfo sessionInfo, HttpRequest payload, long timestamp) { + super(sessionInfo, payload); + setTimestamp(timestamp); + } + + @Override + public SessionEvent instanceForWorker(int index) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public int getType() { + return TYPE; + } + + } + + private final class HttpResponseEvent extends SessionPayloadEvent<HttpResponse> { + + public static final int TYPE = 1012; + + public HttpResponseEvent(SessionInfo sessionInfo, HttpResponse payload, long timestamp) { + super(sessionInfo, payload); + setTimestamp(timestamp); + } + + @Override + public SessionEvent instanceForWorker(int index) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public int getType() { + return TYPE; + } + + } + +}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpTimeWindowClientWorker.java Thu Sep 07 15:42:35 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,394 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.commons.Assert; -import com.passus.commons.annotations.Plugin; -import com.passus.commons.collection.MixedLinkedList; -import com.passus.net.http.HttpRequest; -import com.passus.net.http.HttpResponse; -import com.passus.st.client.DataEvents.DataEnd; -import com.passus.st.client.DataEvents.DataLoopEnd; -import com.passus.st.client.Event; -import com.passus.st.client.SessionEvent; -import com.passus.st.client.SessionStatusEvent; -import com.passus.st.emitter.Emitter; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.plugin.PluginConstants; -import java.util.Comparator; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Queue; -import java.util.TreeSet; -import com.passus.st.client.SessionPayloadEvent; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; - -/** - * - * @author Mirosław Hawrot - */ -@Plugin(name = HttpTimeWindowClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER) -public class HttpTimeWindowClientWorker extends HttpFlowBasedClientWorker { - - public static final String TYPE = "timeWindow"; - - private long waitTimeout = 100; - - private long windowPeriod = 10; - - private long windowStartTime = -1; - - private long windowEndTime = -1; - - private final Map<SessionInfo, Queue<SessionEvent>> sessionsEvents; - - private final Queue<Event> otherEvents = new LinkedList<>(); - - private boolean processData = false; - - public HttpTimeWindowClientWorker(Emitter emitter, String name, int index) { - super(emitter, name, index); - this.sessionsEvents = new HashMap<>(); - } - - public long getWindowPeriod() { - return windowPeriod; - } - - public void setWindowPeriod(long windowPeriod) { - Assert.greaterThanZero(windowPeriod, "tickTime"); - this.windowPeriod = windowPeriod; - } - - public long getWaitTimeout() { - return waitTimeout; - } - - public void setWaitTimeout(long waitTimeout) { - Assert.greaterThanZero(waitTimeout, "eventsQueueWaitTime"); - this.waitTimeout = waitTimeout; - } - - @Override - protected void flowStateChanged(HttpFlowContext context, int oldState) { - synchronized (lock) { - SessionInfo session = context.sessionInfo(); - switch (context.state()) { - case HttpFlowContext.STATE_RESP_RECEIVED: { - Queue<SessionEvent> sessionEvents = getSessionEvents(session, false); - if (sessionEvents != null - && !sessionEvents.isEmpty() - && sessionEvents.peek().getType() == HttpResponseEvent.TYPE) { - sessionEvents.poll(); - } - - break; - } - case HttpFlowContext.STATE_DISCONNECTED: { - Queue<SessionEvent> sessionEvents = getSessionEvents(session, false); - if (sessionEvents != null) { - sessionEvents.clear(); - sessionsEvents.remove(session); - } - - break; - } - } - - lock.notifyAll(); - } - } - - private void waitQuietly() { - try { - lock.wait(waitTimeout); - } catch (InterruptedException ignore) { - } - } - - @Override - protected void closeAllConnections() { - synchronized (lock) { - boolean wait; - do { - wait = false; - for (HttpFlowContext flowContext : sessions.values()) { - if (flowContext.state == HttpFlowContext.STATE_REQ_SENT) { - wait = true; - break; - } - } - - if (wait) { - try { - lock.wait(100); - } catch (Exception e) { - } - } - } while (wait); - - super.closeAllConnections(); - while (!sessions.isEmpty()) { - try { - lock.wait(100); - } catch (Exception e) { - } - } - } - } - - private Queue<SessionEvent> getSessionEvents(SessionInfo session) { - return getSessionEvents(session, true); - } - - private Queue<SessionEvent> getSessionEvents(SessionInfo session, boolean create) { - Queue<SessionEvent> sessionEvents = sessionsEvents.get(session); - if (sessionEvents == null && create) { - sessionEvents = new ConcurrentLinkedQueue<>(); - sessionsEvents.put(session, sessionEvents); - } - - return sessionEvents; - } - - private void addEvent(Event event) { - Event newEvent = eventInstanceForWorker(event); - switch (newEvent.getType()) { - case HttpSessionPayloadEvent.TYPE: { - long time = event.getTimestamp(); - HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) newEvent; - SessionInfo session = payloadEvent.getSessionInfo(); - Queue<SessionEvent> sessionEvents = getSessionEvents(session); - sessionEvents.add(payloadEvent); - - HttpResponse resp = payloadEvent.getResponse(); - HttpRequest req = payloadEvent.getRequest(); - sessionEvents.add(new HttpResponseEvent(session, resp, - time + (resp.getTimestamp() - req.getTimestamp()) - )); - - break; - } - case SessionStatusEvent.TYPE: { - SessionStatusEvent statusEvent = (SessionStatusEvent) newEvent; - Queue<SessionEvent> sessionEvents = getSessionEvents(statusEvent.getSessionInfo()); - sessionEvents.add(statusEvent); - break; - } - case DataLoopEnd.TYPE: - case DataEnd.TYPE: - otherEvents.add(event); - processData = true; - break; - } - } - - private boolean timeInWindow(long time) { - return (windowStartTime <= time && time < windowEndTime); - } - - private void clearWindow() { - windowStartTime = -1; - windowEndTime = -1; - } - - @Override - public void handle(Event event) { - synchronized (lock) { - while (processData) { - try { - lock.wait(waitTimeout); - } catch (InterruptedException ignore) { - - } - } - - long time = event.getTimestamp(); - if (windowEndTime == -1) { - int factor = (int) (time / windowPeriod); - windowStartTime = factor * windowPeriod; - windowEndTime = (factor + 1) * windowPeriod; - addEvent(event); - return; - } - - if (time < windowStartTime) { - logger.debug("Event from the past."); - return; - } else if (!timeInWindow(time)) { - processData = true; - } - - addEvent(event); - } - } - - private boolean processSessionEvent(SessionEvent event) { - switch (event.getType()) { - case SessionStatusEvent.TYPE: { - SessionStatusEvent statusEvent = (SessionStatusEvent) event; - if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { - connect(statusEvent); - } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { - HttpFlowContext flowContext = flowContext(statusEvent); - if (flowContext != null) { - if (flowContext.state != HttpFlowContext.STATE_REQ_SENT) { - close(statusEvent); - } - } - } - - return true; - } - case HttpSessionPayloadEvent.TYPE: { - SessionEvent sessEvent = (SessionEvent) event; - HttpFlowContext flowContext = flowContext(sessEvent); - if (flowContext != null) { - switch (flowContext.state) { - case HttpFlowContext.STATE_CONNECTED: - case HttpFlowContext.STATE_RESP_RECEIVED: - case HttpFlowContext.STATE_ERROR: - if (send(flowContext, (HttpSessionPayloadEvent) event)) { - return true; - } - break; - case HttpFlowContext.STATE_DISCONNECTING: - case HttpFlowContext.STATE_DISCONNECTED: - if (connectPartialSession) { - connect(sessEvent); - } else { - return true; - } - break; - } - } else if (connectPartialSession) { - connect(sessEvent); - } - - break; - } - } - - return false; - } - - @Override - public void run() { - synchronized (lock) { - working = true; - while (working) { - try { - while (!processData) { - waitQuietly(); - } - - for (;;) { - boolean breakLoop = true; - for (Map.Entry<SessionInfo, Queue<SessionEvent>> entry : sessionsEvents.entrySet()) { - SessionInfo session = entry.getKey(); - Queue<SessionEvent> events = entry.getValue(); - if (!events.isEmpty()) { - Event event = events.peek(); - if (timeInWindow(event.getTimestamp())) { - if (processSessionEvent((SessionEvent) event)) { - events.poll(); - } - - breakLoop = false; - } - } - } - - if (breakLoop) { - break; - } else { - waitQuietly(); - } - } - - if (!otherEvents.isEmpty()) { - Iterator<Event> it = otherEvents.iterator(); - while (it.hasNext()) { - Event event = it.next(); - if (timeInWindow(event.getTimestamp())) { - it.remove(); - switch (event.getType()) { - case DataLoopEnd.TYPE: - if (logger.isDebugEnabled()) { - logger.debug("DataLoopEnd received."); - } - - closeAllConnections(); - break; - case DataEnd.TYPE: - if (logger.isDebugEnabled()) { - logger.debug("DataEnd received. Deactivation."); - } - - working = false; - break; - } - } else { - break; - } - } - } - - clearWindow(); - processData = false; - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } - } - } - } - - private final class HttpRequestEvent extends SessionPayloadEvent<HttpRequest> { - - public static final int TYPE = 1011; - - public HttpRequestEvent(SessionInfo sessionInfo, HttpRequest payload, long timestamp) { - super(sessionInfo, payload); - setTimestamp(timestamp); - } - - @Override - public SessionEvent instanceForWorker(int index) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public int getType() { - return TYPE; - } - - } - - private final class HttpResponseEvent extends SessionPayloadEvent<HttpResponse> { - - public static final int TYPE = 1012; - - public HttpResponseEvent(SessionInfo sessionInfo, HttpResponse payload, long timestamp) { - super(sessionInfo, payload); - setTimestamp(timestamp); - } - - @Override - public SessionEvent instanceForWorker(int index) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public int getType() { - return TYPE; - } - - } - -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/client/http/HttpAsynchClientWorkerTest.java Fri Sep 08 08:47:02 2017 +0200 @@ -0,0 +1,89 @@ +package com.passus.st.client.http; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import com.passus.commons.service.ServiceUtils; +import com.passus.st.AbstractWireMockTest; +import com.passus.st.Log4jConfigurationFactory; +import com.passus.st.client.Event; +import com.passus.st.client.SessionEvent; +import com.passus.st.client.SessionStatusEvent; +import com.passus.st.client.TestHttpClientListener; +import com.passus.st.emitter.RuleBasedSessionMapper; +import com.passus.st.emitter.nio.NioEmitter; +import com.passus.st.utils.EventUtils; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * + * @author Mirosław Hawrot + */ +public class HttpAsynchClientWorkerTest extends AbstractWireMockTest { + + private NioEmitter prepareEmitter(String mapperRule) throws Exception { + RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper(); + sessionMapper.addRule(mapperRule); + + NioEmitter emitter = new NioEmitter(); + emitter.setSessionMapper(sessionMapper); + return emitter; + } + + @BeforeMethod + public void beforeMethod() { + String content = "test"; + stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html")) + .willReturn(aResponse() + .withHeader("Content-Type", "text/plain") + .withHeader("Content-Length", "" + content.length()) + .withBody(content))); + } + + @Test + public void testHandle() throws Exception { + Properties props = new Properties(); + props.put("allowPartialSession", "true"); + props.put("ports", "4214"); + List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); + assertEquals(4, events.size()); + + NioEmitter emitter = prepareEmitter("10.87.110.40:4214 -> " + HOST + ":" + PORT); + emitter.start(); + + TestHttpClientListener listner = new TestHttpClientListener(); + + HttpAsynchClientWorker worker = new HttpAsynchClientWorker(emitter, "test", 0); + try { + worker.setListeners(Arrays.asList(listner)); + worker.start(); + + SessionEvent sessionEvent = (SessionEvent) events.get(0); + SessionStatusEvent statusEvent = new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED); + statusEvent.setTimestamp(sessionEvent.getTimestamp()); + worker.handle(statusEvent); + + events.forEach((event) -> { + worker.handle(event); + }); + + worker.join(); + assertTrue(listner.size() > 0); + assertTrue(listner.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent); + TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listner.get(0); + String responseStr = event.getResponse().toString(); + assertTrue(responseStr.startsWith("HTTP/1.1 200 OK")); + assertTrue(responseStr.endsWith("test")); + } finally { + ServiceUtils.stopQuietly(emitter); + } + } + +}
--- a/stress-tester/src/test/java/com/passus/st/client/http/HttpTimeWindowClientWorkerTest.java Thu Sep 07 15:42:35 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,89 +0,0 @@ -package com.passus.st.client.http; - -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.post; -import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; -import com.passus.commons.service.ServiceUtils; -import com.passus.st.AbstractWireMockTest; -import com.passus.st.Log4jConfigurationFactory; -import com.passus.st.client.Event; -import com.passus.st.client.SessionEvent; -import com.passus.st.client.SessionStatusEvent; -import com.passus.st.client.TestHttpClientListener; -import com.passus.st.emitter.RuleBasedSessionMapper; -import com.passus.st.emitter.nio.NioEmitter; -import com.passus.st.utils.EventUtils; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertTrue; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -/** - * - * @author Mirosław Hawrot - */ -public class HttpTimeWindowClientWorkerTest extends AbstractWireMockTest { - - private NioEmitter prepareEmitter(String mapperRule) throws Exception { - RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper(); - sessionMapper.addRule(mapperRule); - - NioEmitter emitter = new NioEmitter(); - emitter.setSessionMapper(sessionMapper); - return emitter; - } - - @BeforeMethod - public void beforeMethod() { - String content = "test"; - stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html")) - .willReturn(aResponse() - .withHeader("Content-Type", "text/plain") - .withHeader("Content-Length", "" + content.length()) - .withBody(content))); - } - - @Test - public void testHandle() throws Exception { - Properties props = new Properties(); - props.put("allowPartialSession", "true"); - props.put("ports", "4214"); - List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); - assertEquals(4, events.size()); - - NioEmitter emitter = prepareEmitter("10.87.110.40:4214 -> " + HOST + ":" + PORT); - emitter.start(); - - TestHttpClientListener listner = new TestHttpClientListener(); - - HttpTimeWindowClientWorker worker = new HttpTimeWindowClientWorker(emitter, "test", 0); - try { - worker.setListeners(Arrays.asList(listner)); - worker.start(); - - SessionEvent sessionEvent = (SessionEvent) events.get(0); - SessionStatusEvent statusEvent = new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED); - statusEvent.setTimestamp(sessionEvent.getTimestamp()); - worker.handle(statusEvent); - - events.forEach((event) -> { - worker.handle(event); - }); - - worker.join(); - assertTrue(listner.size() > 0); - assertTrue(listner.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent); - TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listner.get(0); - String responseStr = event.getResponse().toString(); - assertTrue(responseStr.startsWith("HTTP/1.1 200 OK")); - assertTrue(responseStr.endsWith("test")); - } finally { - ServiceUtils.stopQuietly(emitter); - } - } - -}