Mercurial > stress-tester
changeset 532:f78f71168bef
HttpTimeWindowClientWorker in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/PcapReporter.java Fri Sep 01 09:59:19 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/PcapReporter.java Thu Sep 07 15:42:35 2017 +0200 @@ -233,7 +233,7 @@ @Override public void handle(Event event) { if (event instanceof SessionEvent) { - if (event.getType() == SessionStatusEvent.HTTP_SESSION_STATUS) { + if (event.getType() == SessionStatusEvent.TYPE) { SessionStatusEvent statusEvent = (SessionStatusEvent) event; if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { try { @@ -253,7 +253,7 @@ } } - } else if (event.getType() == HttpSessionPayloadEvent.HTTP_SESSION_PAYLOAD) { + } else if (event.getType() == HttpSessionPayloadEvent.TYPE) { HttpSessionPayloadEvent sessEvent = (HttpSessionPayloadEvent) event; currFlowContext = flowContext(sessEvent); if (currFlowContext == null && partialSession) {
--- a/stress-tester/src/main/java/com/passus/st/client/SessionStatusEvent.java Fri Sep 01 09:59:19 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SessionStatusEvent.java Thu Sep 07 15:42:35 2017 +0200 @@ -8,7 +8,7 @@ */ public class SessionStatusEvent extends SessionEvent { - public static final int HTTP_SESSION_STATUS = 11; + public static final int TYPE = 11; public static final int STATUS_OPENING = 1; public static final int STATUS_ESTABLISHED = 2; @@ -33,7 +33,7 @@ @Override public int getType() { - return HTTP_SESSION_STATUS; + return TYPE; } public int getStatus() {
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java Fri Sep 01 09:59:19 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java Thu Sep 07 15:42:35 2017 +0200 @@ -530,7 +530,7 @@ lock.notifyAll(); } } - + protected boolean send(HttpFlowContext context, HttpSessionPayloadEvent event) { synchronized (lock) { if (event.getRequest() != null) {
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java Fri Sep 01 09:59:19 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java Thu Sep 07 15:42:35 2017 +0200 @@ -165,13 +165,13 @@ && !localFlowContext.eventsQueue.isEmpty()) { Event event = localFlowContext.eventsQueue.peek(); - if (event.getType() == SessionStatusEvent.HTTP_SESSION_STATUS) { + if (event.getType() == SessionStatusEvent.TYPE) { SessionStatusEvent statusEvent = (SessionStatusEvent) event; if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { localFlowContext.eventsQueue.poll(); close((SessionStatusEvent) event); } - } else if (event.getType() == HttpSessionPayloadEvent.HTTP_SESSION_PAYLOAD + } else if (event.getType() == HttpSessionPayloadEvent.TYPE && canSend(flowContext)) { localFlowContext.eventsQueue.poll(); send(flowContext, (HttpSessionPayloadEvent) event); @@ -196,7 +196,7 @@ private void processEvent(Event event) { if (event instanceof SessionEvent) { switch (event.getType()) { - case SessionStatusEvent.HTTP_SESSION_STATUS: + case SessionStatusEvent.TYPE: SessionStatusEvent statusEvent = (SessionStatusEvent) event; if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { LocalHttpFlowContext flowContext = flowContext((SessionEvent) event); @@ -210,7 +210,7 @@ } } break; - case HttpSessionPayloadEvent.HTTP_SESSION_PAYLOAD: { + case HttpSessionPayloadEvent.TYPE: { HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event; LocalHttpFlowContext flowContext = flowContext(payloadEvent); if (flowContext != null) { @@ -261,11 +261,11 @@ public void handle(Event event) { Event newEvent = null; switch (event.getType()) { - case HttpSessionPayloadEvent.HTTP_SESSION_PAYLOAD: + case HttpSessionPayloadEvent.TYPE: semaphore.acquireUninterruptibly(); newEvent = eventInstanceForWorker(event); break; - case SessionStatusEvent.HTTP_SESSION_STATUS: + case SessionStatusEvent.TYPE: case DataLoopEnd.TYPE: case DataEnd.TYPE: newEvent = event;
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java Fri Sep 01 09:59:19 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java Thu Sep 07 15:42:35 2017 +0200 @@ -13,7 +13,7 @@ */ public class HttpSessionPayloadEvent extends SessionPayloadEvent<HttpReqResp> { - public static final int HTTP_SESSION_PAYLOAD = 12; + public static final int TYPE = 12; public HttpSessionPayloadEvent(SessionInfo sessionInfo, HttpRequest req, HttpResponse resp) { this(sessionInfo, new HttpReqResp(req, resp)); @@ -33,7 +33,7 @@ @Override public int getType() { - return HTTP_SESSION_PAYLOAD; + return TYPE; } @Override
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java Fri Sep 01 09:59:19 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java Thu Sep 07 15:42:35 2017 +0200 @@ -123,7 +123,7 @@ } if (event instanceof SessionEvent) { - if (event.getType() == SessionStatusEvent.HTTP_SESSION_STATUS) { + if (event.getType() == SessionStatusEvent.TYPE) { SessionStatusEvent statusEvent = (SessionStatusEvent) event; if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { try { @@ -143,7 +143,7 @@ } return true; - } else if (event.getType() == HttpSessionPayloadEvent.HTTP_SESSION_PAYLOAD) { + } else if (event.getType() == HttpSessionPayloadEvent.TYPE) { SessionEvent sessEvent = (SessionEvent) event; HttpFlowContext flowContext = flowContext(sessEvent); if (flowContext != null) {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpTimeWindowClientWorker.java Thu Sep 07 15:42:35 2017 +0200 @@ -0,0 +1,394 @@ +package com.passus.st.client.http; + +import com.passus.commons.Assert; +import com.passus.commons.annotations.Plugin; +import com.passus.commons.collection.MixedLinkedList; +import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpResponse; +import com.passus.st.client.DataEvents.DataEnd; +import com.passus.st.client.DataEvents.DataLoopEnd; +import com.passus.st.client.Event; +import com.passus.st.client.SessionEvent; +import com.passus.st.client.SessionStatusEvent; +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.plugin.PluginConstants; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; +import java.util.TreeSet; +import com.passus.st.client.SessionPayloadEvent; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * + * @author Mirosław Hawrot + */ +@Plugin(name = HttpTimeWindowClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER) +public class HttpTimeWindowClientWorker extends HttpFlowBasedClientWorker { + + public static final String TYPE = "timeWindow"; + + private long waitTimeout = 100; + + private long windowPeriod = 10; + + private long windowStartTime = -1; + + private long windowEndTime = -1; + + private final Map<SessionInfo, Queue<SessionEvent>> sessionsEvents; + + private final Queue<Event> otherEvents = new LinkedList<>(); + + private boolean processData = false; + + public HttpTimeWindowClientWorker(Emitter emitter, String name, int index) { + super(emitter, name, index); + this.sessionsEvents = new HashMap<>(); + } + + public long getWindowPeriod() { + return windowPeriod; + } + + public void setWindowPeriod(long windowPeriod) { + Assert.greaterThanZero(windowPeriod, "tickTime"); + this.windowPeriod = windowPeriod; + } + + public long getWaitTimeout() { + return waitTimeout; + } + + public void setWaitTimeout(long waitTimeout) { + Assert.greaterThanZero(waitTimeout, "eventsQueueWaitTime"); + this.waitTimeout = waitTimeout; + } + + @Override + protected void flowStateChanged(HttpFlowContext context, int oldState) { + synchronized (lock) { + SessionInfo session = context.sessionInfo(); + switch (context.state()) { + case HttpFlowContext.STATE_RESP_RECEIVED: { + Queue<SessionEvent> sessionEvents = getSessionEvents(session, false); + if (sessionEvents != null + && !sessionEvents.isEmpty() + && sessionEvents.peek().getType() == HttpResponseEvent.TYPE) { + sessionEvents.poll(); + } + + break; + } + case HttpFlowContext.STATE_DISCONNECTED: { + Queue<SessionEvent> sessionEvents = getSessionEvents(session, false); + if (sessionEvents != null) { + sessionEvents.clear(); + sessionsEvents.remove(session); + } + + break; + } + } + + lock.notifyAll(); + } + } + + private void waitQuietly() { + try { + lock.wait(waitTimeout); + } catch (InterruptedException ignore) { + } + } + + @Override + protected void closeAllConnections() { + synchronized (lock) { + boolean wait; + do { + wait = false; + for (HttpFlowContext flowContext : sessions.values()) { + if (flowContext.state == HttpFlowContext.STATE_REQ_SENT) { + wait = true; + break; + } + } + + if (wait) { + try { + lock.wait(100); + } catch (Exception e) { + } + } + } while (wait); + + super.closeAllConnections(); + while (!sessions.isEmpty()) { + try { + lock.wait(100); + } catch (Exception e) { + } + } + } + } + + private Queue<SessionEvent> getSessionEvents(SessionInfo session) { + return getSessionEvents(session, true); + } + + private Queue<SessionEvent> getSessionEvents(SessionInfo session, boolean create) { + Queue<SessionEvent> sessionEvents = sessionsEvents.get(session); + if (sessionEvents == null && create) { + sessionEvents = new ConcurrentLinkedQueue<>(); + sessionsEvents.put(session, sessionEvents); + } + + return sessionEvents; + } + + private void addEvent(Event event) { + Event newEvent = eventInstanceForWorker(event); + switch (newEvent.getType()) { + case HttpSessionPayloadEvent.TYPE: { + long time = event.getTimestamp(); + HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) newEvent; + SessionInfo session = payloadEvent.getSessionInfo(); + Queue<SessionEvent> sessionEvents = getSessionEvents(session); + sessionEvents.add(payloadEvent); + + HttpResponse resp = payloadEvent.getResponse(); + HttpRequest req = payloadEvent.getRequest(); + sessionEvents.add(new HttpResponseEvent(session, resp, + time + (resp.getTimestamp() - req.getTimestamp()) + )); + + break; + } + case SessionStatusEvent.TYPE: { + SessionStatusEvent statusEvent = (SessionStatusEvent) newEvent; + Queue<SessionEvent> sessionEvents = getSessionEvents(statusEvent.getSessionInfo()); + sessionEvents.add(statusEvent); + break; + } + case DataLoopEnd.TYPE: + case DataEnd.TYPE: + otherEvents.add(event); + processData = true; + break; + } + } + + private boolean timeInWindow(long time) { + return (windowStartTime <= time && time < windowEndTime); + } + + private void clearWindow() { + windowStartTime = -1; + windowEndTime = -1; + } + + @Override + public void handle(Event event) { + synchronized (lock) { + while (processData) { + try { + lock.wait(waitTimeout); + } catch (InterruptedException ignore) { + + } + } + + long time = event.getTimestamp(); + if (windowEndTime == -1) { + int factor = (int) (time / windowPeriod); + windowStartTime = factor * windowPeriod; + windowEndTime = (factor + 1) * windowPeriod; + addEvent(event); + return; + } + + if (time < windowStartTime) { + logger.debug("Event from the past."); + return; + } else if (!timeInWindow(time)) { + processData = true; + } + + addEvent(event); + } + } + + private boolean processSessionEvent(SessionEvent event) { + switch (event.getType()) { + case SessionStatusEvent.TYPE: { + SessionStatusEvent statusEvent = (SessionStatusEvent) event; + if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { + connect(statusEvent); + } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { + HttpFlowContext flowContext = flowContext(statusEvent); + if (flowContext != null) { + if (flowContext.state != HttpFlowContext.STATE_REQ_SENT) { + close(statusEvent); + } + } + } + + return true; + } + case HttpSessionPayloadEvent.TYPE: { + SessionEvent sessEvent = (SessionEvent) event; + HttpFlowContext flowContext = flowContext(sessEvent); + if (flowContext != null) { + switch (flowContext.state) { + case HttpFlowContext.STATE_CONNECTED: + case HttpFlowContext.STATE_RESP_RECEIVED: + case HttpFlowContext.STATE_ERROR: + if (send(flowContext, (HttpSessionPayloadEvent) event)) { + return true; + } + break; + case HttpFlowContext.STATE_DISCONNECTING: + case HttpFlowContext.STATE_DISCONNECTED: + if (connectPartialSession) { + connect(sessEvent); + } else { + return true; + } + break; + } + } else if (connectPartialSession) { + connect(sessEvent); + } + + break; + } + } + + return false; + } + + @Override + public void run() { + synchronized (lock) { + working = true; + while (working) { + try { + while (!processData) { + waitQuietly(); + } + + for (;;) { + boolean breakLoop = true; + for (Map.Entry<SessionInfo, Queue<SessionEvent>> entry : sessionsEvents.entrySet()) { + SessionInfo session = entry.getKey(); + Queue<SessionEvent> events = entry.getValue(); + if (!events.isEmpty()) { + Event event = events.peek(); + if (timeInWindow(event.getTimestamp())) { + if (processSessionEvent((SessionEvent) event)) { + events.poll(); + } + + breakLoop = false; + } + } + } + + if (breakLoop) { + break; + } else { + waitQuietly(); + } + } + + if (!otherEvents.isEmpty()) { + Iterator<Event> it = otherEvents.iterator(); + while (it.hasNext()) { + Event event = it.next(); + if (timeInWindow(event.getTimestamp())) { + it.remove(); + switch (event.getType()) { + case DataLoopEnd.TYPE: + if (logger.isDebugEnabled()) { + logger.debug("DataLoopEnd received."); + } + + closeAllConnections(); + break; + case DataEnd.TYPE: + if (logger.isDebugEnabled()) { + logger.debug("DataEnd received. Deactivation."); + } + + working = false; + break; + } + } else { + break; + } + } + } + + clearWindow(); + processData = false; + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } + } + } + } + + private final class HttpRequestEvent extends SessionPayloadEvent<HttpRequest> { + + public static final int TYPE = 1011; + + public HttpRequestEvent(SessionInfo sessionInfo, HttpRequest payload, long timestamp) { + super(sessionInfo, payload); + setTimestamp(timestamp); + } + + @Override + public SessionEvent instanceForWorker(int index) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public int getType() { + return TYPE; + } + + } + + private final class HttpResponseEvent extends SessionPayloadEvent<HttpResponse> { + + public static final int TYPE = 1012; + + public HttpResponseEvent(SessionInfo sessionInfo, HttpResponse payload, long timestamp) { + super(sessionInfo, payload); + setTimestamp(timestamp); + } + + @Override + public SessionEvent instanceForWorker(int index) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public int getType() { + return TYPE; + } + + } + +}
--- a/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpMvelFilter.java Fri Sep 01 09:59:19 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpMvelFilter.java Thu Sep 07 15:42:35 2017 +0200 @@ -94,6 +94,10 @@ vars.put("$req", req); vars.put("$resp", resp); vars.put("$context", context); + if (context != null) { + vars.put("$scopes", context.scopes()); + vars.put("$httpSession", context.scopes().getSession(req)); + } CachingMapVariableResolverFactory factory = new CachingMapVariableResolverFactory(vars); if (globalFactory != null) {
--- a/stress-tester/src/test/java/com/passus/st/client/EventTest.java Fri Sep 01 09:59:19 2017 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/EventTest.java Thu Sep 07 15:42:35 2017 +0200 @@ -3,7 +3,9 @@ import com.passus.commons.Loader; import com.passus.commons.Resolver; import java.lang.reflect.Constructor; +import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.lang.reflect.Parameter; import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; @@ -42,16 +44,11 @@ private static Map<Class, Integer> getEventTypes(Set<Class<?>> classes) throws ReflectiveOperationException { Map<Class, Integer> types = new LinkedHashMap<>(classes.size()); for (final Class<?> clazz : classes) { - Constructor<?> ctor; - try { - ctor = clazz.getDeclaredConstructor(); - } catch (NoSuchMethodException no) { - ctor = clazz.getConstructors()[0]; + Field field = clazz.getDeclaredField("TYPE"); + if (field.isAccessible()) { + types.put(clazz, (Integer) field.get(null)); } - ctor.setAccessible(true); - int params = ctor.getParameterCount(); - Event e = (Event) ctor.newInstance(new Object[params]); - types.put(clazz, e.getType()); + } return types; }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/client/http/HttpTimeWindowClientWorkerTest.java Thu Sep 07 15:42:35 2017 +0200 @@ -0,0 +1,89 @@ +package com.passus.st.client.http; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import com.passus.commons.service.ServiceUtils; +import com.passus.st.AbstractWireMockTest; +import com.passus.st.Log4jConfigurationFactory; +import com.passus.st.client.Event; +import com.passus.st.client.SessionEvent; +import com.passus.st.client.SessionStatusEvent; +import com.passus.st.client.TestHttpClientListener; +import com.passus.st.emitter.RuleBasedSessionMapper; +import com.passus.st.emitter.nio.NioEmitter; +import com.passus.st.utils.EventUtils; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * + * @author Mirosław Hawrot + */ +public class HttpTimeWindowClientWorkerTest extends AbstractWireMockTest { + + private NioEmitter prepareEmitter(String mapperRule) throws Exception { + RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper(); + sessionMapper.addRule(mapperRule); + + NioEmitter emitter = new NioEmitter(); + emitter.setSessionMapper(sessionMapper); + return emitter; + } + + @BeforeMethod + public void beforeMethod() { + String content = "test"; + stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html")) + .willReturn(aResponse() + .withHeader("Content-Type", "text/plain") + .withHeader("Content-Length", "" + content.length()) + .withBody(content))); + } + + @Test + public void testHandle() throws Exception { + Properties props = new Properties(); + props.put("allowPartialSession", "true"); + props.put("ports", "4214"); + List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); + assertEquals(4, events.size()); + + NioEmitter emitter = prepareEmitter("10.87.110.40:4214 -> " + HOST + ":" + PORT); + emitter.start(); + + TestHttpClientListener listner = new TestHttpClientListener(); + + HttpTimeWindowClientWorker worker = new HttpTimeWindowClientWorker(emitter, "test", 0); + try { + worker.setListeners(Arrays.asList(listner)); + worker.start(); + + SessionEvent sessionEvent = (SessionEvent) events.get(0); + SessionStatusEvent statusEvent = new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED); + statusEvent.setTimestamp(sessionEvent.getTimestamp()); + worker.handle(statusEvent); + + events.forEach((event) -> { + worker.handle(event); + }); + + worker.join(); + assertTrue(listner.size() > 0); + assertTrue(listner.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent); + TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listner.get(0); + String responseStr = event.getResponse().toString(); + assertTrue(responseStr.startsWith("HTTP/1.1 200 OK")); + assertTrue(responseStr.endsWith("test")); + } finally { + ServiceUtils.stopQuietly(emitter); + } + } + +}
--- a/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java Fri Sep 01 09:59:19 2017 +0200 +++ b/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java Thu Sep 07 15:42:35 2017 +0200 @@ -33,7 +33,7 @@ src.stop(); assertTrue(handler.get(handler.size() - 3) instanceof SessionStatusEvent); - assertTrue(handler.findFirst(HttpSessionPayloadEvent.HTTP_SESSION_PAYLOAD) instanceof HttpSessionPayloadEvent); + assertTrue(handler.findFirst(HttpSessionPayloadEvent.TYPE) instanceof HttpSessionPayloadEvent); assertTrue(handler.get(handler.size() - 2) instanceof DataLoopEnd); assertTrue(handler.get(handler.size() - 1) instanceof DataEnd); }