Mercurial > stress-tester
changeset 958:c15144d4da64
Refactorization in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/ConverterHttpClient.java Wed May 29 15:03:59 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/ConverterHttpClient.java Thu May 30 10:26:34 2019 +0200 @@ -4,7 +4,6 @@ import com.passus.commons.service.Service; import com.passus.st.client.*; import com.passus.st.client.http.HttpScopes; -import com.passus.st.client.http.HttpSessionPayloadEvent; import com.passus.st.client.http.filter.HttpFilter; import com.passus.st.client.http.filter.HttpFlowUtils; import com.passus.st.emitter.SessionInfo;
--- a/stress-tester/src/main/java/com/passus/st/PcapReporter.java Wed May 29 15:03:59 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/PcapReporter.java Thu May 30 10:26:34 2019 +0200 @@ -6,7 +6,8 @@ import com.passus.net.http.*; import com.passus.net.http.session.HttpSessionAnalyzer; import com.passus.st.client.*; -import com.passus.st.client.http.*; +import com.passus.st.client.http.HttpScopes; +import com.passus.st.client.http.ReporterRemoteDestination; import com.passus.st.emitter.SessionInfo; import com.passus.st.metric.FileMetricsCollectionAppender; import com.passus.st.metric.ScheduledMetricsCollector; @@ -27,6 +28,7 @@ import java.util.stream.Collectors; import static com.passus.st.Main.*; +import static com.passus.st.Protocols.HTTP; import static com.passus.st.client.http.HttpConsts.*; import static com.passus.st.client.http.filter.HttpFlowUtils.createFlowContext; import static com.passus.st.client.http.filter.HttpFlowUtils.extractHttpContext; @@ -243,8 +245,8 @@ } } - } else if (event.getType() == HttpSessionPayloadEvent.TYPE) { - HttpSessionPayloadEvent sessEvent = (HttpSessionPayloadEvent) event; + } else if (event.getType() == SessionPayloadEvent.TYPE) { + SessionPayloadEvent sessEvent = (SessionPayloadEvent) event; currFlowContext = flowContext(sessEvent); if (currFlowContext == null && partialSession) { try { @@ -264,24 +266,26 @@ } } - private void send(HttpSessionPayloadEvent event) { + private void send(SessionPayloadEvent event) { synchronized (lock) { - HttpRequest req = event.getRequest(); - HttpResponse resp = event.getResponse(); - System.out.println(req == null ? null : req.getUrl()); - ++count; - if (req != null && resp != null) { - try { - setSizeAndTimeTags(req, reqEncoder); - setSizeAndTimeTags(resp, respEncoder); - FlowContext ctx = sessions.get(event.getSessionInfo()); - if (ctx != null) { - extractHttpContext(ctx).scopes().removeConversation(req); + if (event.getProtocolId() == HTTP) { + HttpRequest req = (HttpRequest) event.getRequest(); + HttpResponse resp = (HttpResponse) event.getResponse(); + System.out.println(req == null ? null : req.getUrl()); + ++count; + if (req != null && resp != null) { + try { + setSizeAndTimeTags(req, reqEncoder); + setSizeAndTimeTags(resp, respEncoder); + FlowContext ctx = sessions.get(event.getSessionInfo()); + if (ctx != null) { + extractHttpContext(ctx).scopes().removeConversation(req); + } + + reporter.responseReceived(req, resp, ctx); + } catch (Exception e) { + logger.debug(e.getMessage(), e); } - - reporter.responseReceived(req, resp, ctx); - } catch (Exception e) { - logger.debug(e.getMessage(), e); } } }
--- a/stress-tester/src/main/java/com/passus/st/ReaderMain.java Wed May 29 15:03:59 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/ReaderMain.java Thu May 30 10:26:34 2019 +0200 @@ -2,25 +2,16 @@ import com.passus.data.DataSourceUtils; import com.passus.data.PooledByteBuffAllocator; -import com.passus.net.http.HttpMessageHelper; -import com.passus.net.http.HttpRequest; -import com.passus.net.http.HttpRequestEncoder; -import com.passus.net.http.HttpResponse; -import com.passus.net.http.HttpResponseEncoder; +import com.passus.net.http.*; import com.passus.st.client.Event; +import com.passus.st.client.SessionPayloadEvent; import com.passus.st.client.SessionStatusEvent; -import com.passus.st.client.http.HttpSessionPayloadEvent; import com.passus.st.client.http.filter.HttpMessagePredicate; import com.passus.st.emitter.SessionInfo; import com.passus.st.filter.Transformers; import com.passus.st.source.EventSource; import com.passus.st.source.NcEventSource; import com.passus.st.source.PcapSessionEventSource; -import static com.passus.st.utils.CliUtils.option; -import java.io.File; -import java.io.IOException; -import java.io.PrintStream; -import java.util.function.Predicate; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; @@ -28,8 +19,15 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.function.Predicate; + +import static com.passus.st.Protocols.HTTP; +import static com.passus.st.utils.CliUtils.option; + /** - * * @author Mirosław Hawrot */ public class ReaderMain { @@ -113,60 +111,62 @@ printSessionInfo(statusEvent.getSessionInfo(), ps); ps.print(" status: "); ps.println(SessionStatusEvent.statusToString(statusEvent.getStatus())); - } else if (event.getType() == HttpSessionPayloadEvent.TYPE) { - HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event; - HttpRequest req = payloadEvent.getRequest(); - HttpResponse resp = payloadEvent.getResponse(); - if (filter != null && !filter.test(req, resp, null)) { - return; - } - - printSessionInfo(payloadEvent.getSessionInfo(), ps); - ps.println(" payload: "); - - if (req != null) { - try { - HttpRequestEncoder.INSTANCE.encodeHeaders(req, ps); - if (printContent) { - if (!printBinaryContent && messageHelper.hasBinaryContent(req)) { - ps.println("<binary content>"); - } else { - if (decodeContent) { - messageHelper.decodeContent(req, false); - } - - HttpRequestEncoder.INSTANCE.encodeContent(req, ps); - } - } + } else if (event.getType() == SessionPayloadEvent.TYPE) { + SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event; + if (payloadEvent.getProtocolId() == HTTP) { + HttpRequest req = (HttpRequest) payloadEvent.getRequest(); + HttpResponse resp = (HttpResponse) payloadEvent.getResponse(); + if (filter != null && !filter.test(req, resp, null)) { + return; + } - ps.println(""); - } catch (IOException ex) { - logger.debug(ex.getMessage(), ex); - } finally { - DataSourceUtils.release(req.getContent()); - } - } + printSessionInfo(payloadEvent.getSessionInfo(), ps); + ps.println(" payload: "); - if (resp != null) { - try { - HttpResponseEncoder.INSTANCE.encodeHeaders(resp, ps); - if (printContent) { - if (!printBinaryContent && messageHelper.hasBinaryContent(req)) { - ps.println("<binary content>"); - } else { - if (decodeContent) { - messageHelper.decodeContent(resp, false); + if (req != null) { + try { + HttpRequestEncoder.INSTANCE.encodeHeaders(req, ps); + if (printContent) { + if (!printBinaryContent && messageHelper.hasBinaryContent(req)) { + ps.println("<binary content>"); + } else { + if (decodeContent) { + messageHelper.decodeContent(req, false); + } + + HttpRequestEncoder.INSTANCE.encodeContent(req, ps); } - - HttpResponseEncoder.INSTANCE.encodeContent(resp, ps); } - } - ps.println(""); - } catch (IOException ex) { - logger.debug(ex.getMessage(), ex); - } finally { - DataSourceUtils.release(resp.getContent()); + ps.println(""); + } catch (IOException ex) { + logger.debug(ex.getMessage(), ex); + } finally { + DataSourceUtils.release(req.getContent()); + } + } + + if (resp != null) { + try { + HttpResponseEncoder.INSTANCE.encodeHeaders(resp, ps); + if (printContent) { + if (!printBinaryContent && messageHelper.hasBinaryContent(req)) { + ps.println("<binary content>"); + } else { + if (decodeContent) { + messageHelper.decodeContent(resp, false); + } + + HttpResponseEncoder.INSTANCE.encodeContent(resp, ps); + } + } + + ps.println(""); + } catch (IOException ex) { + logger.debug(ex.getMessage(), ex); + } finally { + DataSourceUtils.release(resp.getContent()); + } } } }
--- a/stress-tester/src/main/java/com/passus/st/StatsMain.java Wed May 29 15:03:59 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/StatsMain.java Thu May 30 10:26:34 2019 +0200 @@ -5,20 +5,16 @@ import com.passus.data.ByteString; import com.passus.data.ByteStringImpl; import com.passus.data.PooledByteBuffAllocator; -import static com.passus.net.http.HttpHeaders.CONTENT_ENCODING; -import static com.passus.net.http.HttpHeaders.TRANSFER_ENCODING; import com.passus.net.http.HttpMessage; import com.passus.net.http.HttpMessageHelper; import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpResponse; import com.passus.st.client.Event; +import com.passus.st.client.SessionPayloadEvent; import com.passus.st.client.SessionStatusEvent; -import com.passus.st.client.http.HttpSessionPayloadEvent; import com.passus.st.source.EventSource; import com.passus.st.source.NcEventSource; import com.passus.st.source.PcapSessionEventSource; -import java.util.HashMap; -import java.util.Map; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; @@ -27,8 +23,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.HashMap; +import java.util.Map; + +import static com.passus.net.http.HttpHeaders.CONTENT_ENCODING; +import static com.passus.net.http.HttpHeaders.TRANSFER_ENCODING; +import static com.passus.st.Protocols.HTTP; + /** - * * @author Mirosław Hawrot */ public class StatsMain { @@ -83,30 +85,32 @@ private void stats(Event event) { if (event.getType() == SessionStatusEvent.TYPE) { - } else if (event.getType() == HttpSessionPayloadEvent.TYPE) { - HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event; - HttpRequest req = payloadEvent.getRequest(); - HttpResponse resp = payloadEvent.getResponse(); + } else if (event.getType() == SessionPayloadEvent.TYPE) { + SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event; + if (payloadEvent.getProtocolId() == HTTP) { + HttpRequest req = (HttpRequest) payloadEvent.getRequest(); + HttpResponse resp = (HttpResponse) payloadEvent.getResponse(); - if (req != null) { - stats.reqNum++; - stats.addContentEncoding(req); - stats.addTransferEncoding(req); - stats.addEncoding(req); - if (req.getContent() != null) { - stats.reqContentSize.update(req.getContent().available()); - stats.reqContentSizeSum += req.getContent().available(); + if (req != null) { + stats.reqNum++; + stats.addContentEncoding(req); + stats.addTransferEncoding(req); + stats.addEncoding(req); + if (req.getContent() != null) { + stats.reqContentSize.update(req.getContent().available()); + stats.reqContentSizeSum += req.getContent().available(); + } } - } - if (resp != null) { - stats.respNum++; - stats.addContentEncoding(resp); - stats.addTransferEncoding(resp); - stats.addEncoding(resp); - if (resp.getContent() != null) { - stats.respContentSize.update(resp.getContent().available()); - stats.respContentSizeSum += resp.getContent().available(); + if (resp != null) { + stats.respNum++; + stats.addContentEncoding(resp); + stats.addTransferEncoding(resp); + stats.addEncoding(resp); + if (resp.getContent() != null) { + stats.respContentSize.update(resp.getContent().available()); + stats.respContentSizeSum += resp.getContent().available(); + } } } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java Thu May 30 10:26:34 2019 +0200 @@ -0,0 +1,558 @@ +package com.passus.st.client; + +import com.passus.commons.Assert; +import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpResponse; +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.SessionInfo; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class AsynchFlowWorker extends FlowWorkerBased { + + public static final String TYPE = "asynch"; + + private long waitTimeout = 100; + + private long windowPeriod = 10; + + private final Queue<TasksTimeWindow> windows = new ConcurrentLinkedQueue<>(); + + private TasksTimeWindow currentWindow; + + private volatile boolean processWindow = false; + + private volatile boolean flowStateChanged = false; + + private final Object readerLock = new Object(); + + private boolean responseSynch = false; + + public AsynchFlowWorker(Emitter emitter, String name, int index) { + super(emitter, name, index); + + } + + public long getWindowPeriod() { + return windowPeriod; + } + + public void setWindowPeriod(long windowPeriod) { + Assert.greaterThanZero(windowPeriod, "tickTime"); + this.windowPeriod = windowPeriod; + } + + public long getWaitTimeout() { + return waitTimeout; + } + + public void setWaitTimeout(long waitTimeout) { + Assert.greaterThanZero(waitTimeout, "eventsQueueWaitTime"); + this.waitTimeout = waitTimeout; + } + + public boolean isResponseSynch() { + return responseSynch; + } + + public void setResponseSynch(boolean responseSynch) { + this.responseSynch = responseSynch; + } + + private void waitQuietly() { + try { + lock.wait(waitTimeout); + } catch (InterruptedException ignore) { + } + } + + @Override + protected void closeAllConnections() { + synchronized (lock) { + boolean wait; + do { + wait = false; + for (FlowContext flowContext : sessions.values()) { + if (flowContext.state() == FlowContext.STATE_REQ_SENT) { + wait = true; + break; + } + } + + if (wait) { + try { + lock.wait(100); + } catch (Exception e) { + } + } + } while (wait); + + super.closeAllConnections(); + while (!sessions.isEmpty()) { + try { + lock.wait(100); + } catch (Exception e) { + } + } + } + } + + private void removeWindow(TasksTimeWindow window) { + if (window != null) { + windows.remove(window); + } + } + + private TasksTimeWindow createWindow(long time) { + TasksTimeWindow window = new TasksTimeWindow(time, windowPeriod); + windows.add(window); + return window; + } + + private TasksTimeWindow getWindow(long time, boolean create) { + if (windows.isEmpty()) { + if (create) { + return createWindow(time); + } + + return null; + } + + TasksTimeWindow firstWindow = windows.peek(); + if (firstWindow.inTimeRange(time)) { + return firstWindow; + } else if (windows.size() > 1) { + Iterator<TasksTimeWindow> it = windows.iterator(); + it.next(); + + while (it.hasNext()) { + TasksTimeWindow window = it.next(); + if (window.inTimeRange(time)) { + return window; + } + } + } + + if (create) { + return createWindow(time); + } + + return null; + } + + @Override + public void sessionInvalidated(SessionInfo session) throws Exception { + synchronized (lock) { + if (logger.isDebugEnabled()) { + logger.debug("Session {} invalidated.", session); + } + + FlowContext flowContext = flowContext(session); + if (flowContext != null) { + changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING); + } + + addBlockedSession(session); + lock.notifyAll(); + } + } + + @Override + protected void flowStateChanged(FlowContext context, int oldState) { + synchronized (lock) { + flowStateChanged = true; + lock.notifyAll(); + } + } + + private void addEvent(Event event, TasksTimeWindow window) { + switch (event.getType()) { + case SessionPayloadEvent.TYPE: { + Event newEvent = eventInstanceForWorker(event); + long time = newEvent.getTimestamp(); + SessionPayloadEvent payloadEvent = (SessionPayloadEvent) newEvent; + SessionInfo session = payloadEvent.getSessionInfo(); + + SessionEventsTask task = window.getSessionEventsTask(session, true); + task.events.add(payloadEvent); + + if (responseSynch) { + HttpResponse resp = (HttpResponse) payloadEvent.getResponse(); + HttpRequest req = (HttpRequest) payloadEvent.getRequest(); + long respTime = time + (resp.getTimestamp() - req.getTimestamp()); + HttpResponseEvent respEvent = new HttpResponseEvent(session, event.getSourceName(), resp, respTime); + + task.events.add(respEvent); + } + + break; + } + case SessionStatusEvent.TYPE: { + Event newEvent = eventInstanceForWorker(event); + SessionStatusEvent statusEvent = (SessionStatusEvent) newEvent; + SessionEventsTask task = window.getSessionEventsTask(statusEvent.getSessionInfo(), true); + task.events.add(statusEvent); + break; + } + case DataEvents.DataLoopEnd.TYPE: + window.add(DataLoopEndTask.INSTANCE); + processWindow = true; + break; + case DataEvents.DataEnd.TYPE: + window.add(DataEndTask.INSTANCE); + processWindow = true; + break; + } + } + + @Override + public void handle(Event event) { + synchronized (readerLock) { + while (processWindow) { + try { + readerLock.wait(); + } catch (InterruptedException ignore) { + + } + } + } + + synchronized (lock) { + TasksTimeWindow window = getWindow(event.getTimestamp(), true); + if (currentWindow == null) { + currentWindow = window; + } else if (window != currentWindow) { + currentWindow = windows.peek(); + processWindow = true; + } + + addEvent(event, window); + if (processWindow) { + lock.notifyAll(); + } + } + + } + + private boolean processSessionEvent(SessionEvent event) { + switch (event.getType()) { + case SessionStatusEvent.TYPE: { + SessionStatusEvent statusEvent = (SessionStatusEvent) event; + if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { + connect(statusEvent); + } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { + FlowContext flowContext = flowContext(statusEvent); + if (flowContext != null) { + if (flowContext.state() != FlowContext.STATE_REQ_SENT) { + close(statusEvent); + } + } + } + + return true; + } + case SessionPayloadEvent.TYPE: { + SessionEvent sessEvent = event; + FlowContext flowContext = flowContext(sessEvent); + if (flowContext != null) { + switch (flowContext.state()) { + case FlowContext.STATE_CONNECTING: + case FlowContext.STATE_REQ_SENT: + return false; + case FlowContext.STATE_CONNECTED: + case FlowContext.STATE_RESP_RECEIVED: + case FlowContext.STATE_ERROR: + if (send(flowContext, (SessionPayloadEvent) event)) { + return true; + } + case FlowContext.STATE_DISCONNECTING: + case FlowContext.STATE_DISCONNECTED: + if (connectPartialSession) { + connect(sessEvent); + } else { + return true; + } + break; + } + } else if (connectPartialSession) { + connect(sessEvent); + } + + return true; + } + case HttpResponseEvent.TYPE: { + SessionEvent sessEvent = event; + FlowContext flowContext = flowContext(sessEvent); + if (flowContext != null) { + return (flowContext.state() == FlowContext.STATE_RESP_RECEIVED + || flowContext.state() >= FlowContext.STATE_DISCONNECTING); + } + + return true; + } + } + + return false; + } + + @Override + public void run() { + synchronized (lock) { + working = true; + while (working) { + try { + lock.wait(); + + boolean dataLoopEnd = false; + boolean dataEnd = false; + + for (; ; ) { + Iterator<Task> it = currentWindow.tasks.iterator(); + while (it.hasNext()) { + Task task = it.next(); + switch (task.type()) { + case SessionEventsTask.TYPE: + SessionEventsTask sessionTask = (SessionEventsTask) task; + if (isBlockedSession(sessionTask.session)) { + sessionTask.events.clear(); + it.remove(); + return; + } + + if (!sessionTask.events.isEmpty()) { + Event event = sessionTask.events.get(0); + if (processSessionEvent((SessionEvent) event)) { + sessionTask.events.remove(0); + } + } + + if (sessionTask.events.isEmpty()) { + it.remove(); + } + + break; + case DataLoopEndTask.TYPE: + dataLoopEnd = true; + it.remove(); + break; + case DataEndTask.TYPE: + dataEnd = true; + it.remove(); + break; + } + } + + if (currentWindow.tasks.isEmpty()) { + break; + } else if (!flowStateChanged) { + waitQuietly(); + } else { + flowStateChanged = false; + } + } + + if (dataLoopEnd) { + closeAllConnections(); + } + + if (dataEnd) { + working = false; + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } finally { + removeWindow(currentWindow); + processWindow = false; + synchronized (readerLock) { + readerLock.notifyAll(); + } + } + } + } + } + + private static final class HttpResponseEvent extends SessionEvent { + + public static final int TYPE = 1012; + + private final HttpResponse payload; + + public HttpResponseEvent(SessionInfo sessionInfo, String sourceName, HttpResponse payload, long timestamp) { + super(sessionInfo, sourceName); + this.payload = payload; + setTimestamp(timestamp); + } + + public HttpResponse getPayload() { + return payload; + } + + @Override + public SessionEvent instanceForWorker(int index) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public int getType() { + return TYPE; + } + + } + + private static interface Task { + + public int type(); + + } + + private static final class SessionEventsTask implements Task { + + public static final int TYPE = 1; + + private final SessionInfo session; + + private final List<SessionEvent> events = new LinkedList<>(); + + public SessionEventsTask(SessionInfo session) { + this.session = session; + } + + @Override + public int type() { + return TYPE; + } + + } + + private static final class DataLoopEndTask implements Task { + + public static final DataLoopEndTask INSTANCE = new DataLoopEndTask(); + + public static final int TYPE = 2; + + @Override + public int type() { + return TYPE; + } + + } + + private static final class DataEndTask implements Task { + + public static final DataEndTask INSTANCE = new DataEndTask(); + + public static final int TYPE = 3; + + @Override + public int type() { + return TYPE; + } + + } + + private static final class TasksTimeWindow { + + private final long startTime; + + private final long endTime; + + private final List<Task> tasks = new LinkedList<>(); + + private TasksTimeWindow(long time, long period) { + int factor = (int) (time / period); + startTime = factor * period; + endTime = (factor + 1) * period; + } + + public long startTime() { + return startTime; + } + + public long endTime() { + return endTime; + } + + public boolean inTimeRange(long time) { + return (startTime <= time && time < endTime); + } + + public void add(Task value) { + tasks.add(value); + } + + private SessionEventsTask createSessionEventsTask(SessionInfo session) { + SessionEventsTask task = new SessionEventsTask(session); + tasks.add(task); + return task; + } + + public SessionEventsTask getSessionEventsTask(SessionInfo session, boolean create) { + if (tasks.isEmpty()) { + if (create) { + return createSessionEventsTask(session); + } + + return null; + } + + for (Task task : tasks) { + if (task.type() == SessionEventsTask.TYPE) { + SessionEventsTask sessionTask = (SessionEventsTask) task; + if (sessionTask.session.equals(session)) { + return sessionTask; + } + } + } + + if (create) { + return createSessionEventsTask(session); + } + + return null; + } + + public List<Task> values() { + return tasks; + } + + public long getPeroid() { + return (endTime - startTime); + } + + @Override + public String toString() { + return "TimeWindow{" + + "startTimestamp=" + startTime + + ", endTimestamp=" + endTime + '}'; + } + + @Override + public int hashCode() { + int hash = 5; + hash = 17 * hash + (int) (this.startTime ^ (this.startTime >>> 32)); + hash = 17 * hash + (int) (this.endTime ^ (this.endTime >>> 32)); + return hash; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if (!(obj instanceof TasksTimeWindow)) { + return false; + } + + final TasksTimeWindow other = (TasksTimeWindow) obj; + return this.startTime == other.startTime + && this.endTime == other.endTime; + } + + } + +}
--- a/stress-tester/src/main/java/com/passus/st/client/ClientXFactoryImpl.java Wed May 29 15:03:59 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/ClientXFactoryImpl.java Thu May 30 10:26:34 2019 +0200 @@ -13,6 +13,6 @@ return new HttpClientX(); } - return null; + throw new IllegalArgumentException("Not supported protocol '" + protocolId + "'."); } }
--- a/stress-tester/src/main/java/com/passus/st/client/FilterAware.java Wed May 29 15:03:59 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FilterAware.java Thu May 30 10:26:34 2019 +0200 @@ -8,10 +8,10 @@ */ public interface FilterAware { - public List<FlowFilter> getFilters(); + List<FlowFilter> getFilters(); - public void setFilters(Collection<FlowFilter> filters); + void setFilters(Collection<FlowFilter> filters); - public void addFilter(FlowFilter filter); + void addFilter(FlowFilter filter); }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Wed May 29 15:03:59 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Thu May 30 10:26:34 2019 +0200 @@ -25,6 +25,8 @@ protected final Emitter emitter; + protected ClientXFactory clientFactory = new ClientXFactoryImpl(); + protected boolean collectMetric; protected HttpClientWorkerMetric metric; @@ -33,8 +35,6 @@ protected TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); - protected ClientXFactory clientFactory; - public FlowWorker(Emitter emitter, String name, int index) { super(name + index); Assert.notNull(emitter, "emitter"); @@ -65,6 +65,15 @@ this.filterChain = filterChain; } + public ClientXFactory getClientFactory() { + return clientFactory; + } + + public void setClientFactory(ClientXFactory clientFactory) { + Assert.notNull(clientFactory, "clientFactory"); + this.clientFactory = clientFactory; + } + protected final void fireResponseReceived(Object request, Object response, FlowContext context) { if (listener != null) { listener.responseReceived(request, response, context);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java Wed May 29 15:03:59 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java Thu May 30 10:26:34 2019 +0200 @@ -111,7 +111,6 @@ flowContext.clear(); break; case FlowContext.STATE_CONNECTED: - flowContext.client().init(flowContext); flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY)); break; case FlowContext.STATE_ERROR: @@ -194,6 +193,10 @@ } FlowContext flowContext = createFlowContext(session); + //TODO Malo optymalne + ClientX client = clientFactory.create(session.getProtocolId()); + client.init(flowContext); + flowContext.client(client); sessions.put(session, flowContext); return flowContext; }
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Wed May 29 15:03:59 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Thu May 30 10:26:34 2019 +0200 @@ -234,7 +234,10 @@ loopEnd = true; closeAllConnections(); filterChain.reset(); - loop = currFlowContext.loop() + 1; + if (currFlowContext != null) { + loop = currFlowContext.loop() + 1; + } + loopEnd = false; return true; } else if (event.getType() == DataEvents.DataEnd.TYPE) {
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java Wed May 29 15:03:59 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,567 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.commons.Assert; -import com.passus.commons.annotations.Plugin; -import com.passus.net.http.HttpRequest; -import com.passus.net.http.HttpResponse; -import com.passus.st.client.DataEvents.DataEnd; -import com.passus.st.client.DataEvents.DataLoopEnd; -import com.passus.st.client.*; -import com.passus.st.emitter.Emitter; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.plugin.PluginConstants; - -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - -/** - * @author Mirosław Hawrot - */ -@Deprecated -@Plugin(name = HttpAsynchClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER) -public class HttpAsynchClientWorker extends HttpFlowBasedClientWorker { - - public static final String TYPE = "asynch"; - - private long waitTimeout = 100; - - private long windowPeriod = 10; - - private final Queue<TasksTimeWindow> windows = new ConcurrentLinkedQueue<>(); - - private TasksTimeWindow currentWindow; - - private volatile boolean processWindow = false; - - private volatile boolean flowStateChanged = false; - - private final Object readerLock = new Object(); - - private boolean responseSynch = false; - - public HttpAsynchClientWorker(Emitter emitter, String name, int index) { - super(emitter, name, index); - - } - - public long getWindowPeriod() { - return windowPeriod; - } - - public void setWindowPeriod(long windowPeriod) { - Assert.greaterThanZero(windowPeriod, "tickTime"); - this.windowPeriod = windowPeriod; - } - - public long getWaitTimeout() { - return waitTimeout; - } - - public void setWaitTimeout(long waitTimeout) { - Assert.greaterThanZero(waitTimeout, "eventsQueueWaitTime"); - this.waitTimeout = waitTimeout; - } - - public boolean isResponseSynch() { - return responseSynch; - } - - public void setResponseSynch(boolean responseSynch) { - this.responseSynch = responseSynch; - } - - private void waitQuietly() { - try { - lock.wait(waitTimeout); - } catch (InterruptedException ignore) { - } - } - - @Override - protected void closeAllConnections() { - synchronized (lock) { - boolean wait; - do { - wait = false; - for (FlowContext flowContext : sessions.values()) { - if (flowContext.state() == FlowContext.STATE_REQ_SENT) { - wait = true; - break; - } - } - - if (wait) { - try { - lock.wait(100); - } catch (Exception e) { - } - } - } while (wait); - - super.closeAllConnections(); - while (!sessions.isEmpty()) { - try { - lock.wait(100); - } catch (Exception e) { - } - } - } - } - - private void removeWindow(TasksTimeWindow window) { - if (window != null) { - windows.remove(window); - } - } - - private TasksTimeWindow createWindow(long time) { - TasksTimeWindow window = new TasksTimeWindow(time, windowPeriod); - windows.add(window); - return window; - } - - private TasksTimeWindow getWindow(long time, boolean create) { - if (windows.isEmpty()) { - if (create) { - return createWindow(time); - } - - return null; - } - - TasksTimeWindow firstWindow = windows.peek(); - if (firstWindow.inTimeRange(time)) { - return firstWindow; - } else if (windows.size() > 1) { - Iterator<TasksTimeWindow> it = windows.iterator(); - it.next(); - - while (it.hasNext()) { - TasksTimeWindow window = it.next(); - if (window.inTimeRange(time)) { - return window; - } - } - } - - if (create) { - return createWindow(time); - } - - return null; - } - - @Override - public void sessionInvalidated(SessionInfo session) throws Exception { - synchronized (lock) { - if (logger.isDebugEnabled()) { - logger.debug("Session {} invalidated.", session); - } - - FlowContext flowContext = flowContext(session); - if (flowContext != null) { - changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING); - } - - addBlockedSession(session); - lock.notifyAll(); - } - } - - @Override - protected void flowStateChanged(FlowContext context, int oldState) { - synchronized (lock) { - flowStateChanged = true; - lock.notifyAll(); - } - } - - private void addEvent(Event event, TasksTimeWindow window) { - switch (event.getType()) { - case SessionPayloadEvent.TYPE: { - Event newEvent = eventInstanceForWorker(event); - long time = newEvent.getTimestamp(); - SessionPayloadEvent payloadEvent = (SessionPayloadEvent) newEvent; - SessionInfo session = payloadEvent.getSessionInfo(); - - SessionEventsTask task = window.getSessionEventsTask(session, true); - task.events.add(payloadEvent); - - if (responseSynch) { - HttpResponse resp = (HttpResponse) payloadEvent.getResponse(); - HttpRequest req = (HttpRequest) payloadEvent.getRequest(); - long respTime = time + (resp.getTimestamp() - req.getTimestamp()); - HttpResponseEvent respEvent = new HttpResponseEvent(session, event.getSourceName(), resp, respTime); - - task.events.add(respEvent); - } - - break; - } - case SessionStatusEvent.TYPE: { - Event newEvent = eventInstanceForWorker(event); - SessionStatusEvent statusEvent = (SessionStatusEvent) newEvent; - SessionEventsTask task = window.getSessionEventsTask(statusEvent.getSessionInfo(), true); - task.events.add(statusEvent); - break; - } - case DataLoopEnd.TYPE: - window.add(DataLoopEndTask.INSTANCE); - processWindow = true; - break; - case DataEnd.TYPE: - window.add(DataEndTask.INSTANCE); - processWindow = true; - break; - } - } - - @Override - public void handle(Event event) { - synchronized (readerLock) { - while (processWindow) { - try { - readerLock.wait(); - } catch (InterruptedException ignore) { - - } - } - } - - synchronized (lock) { - TasksTimeWindow window = getWindow(event.getTimestamp(), true); - if (currentWindow == null) { - currentWindow = window; - } else if (window != currentWindow) { - currentWindow = windows.peek(); - processWindow = true; - } - - addEvent(event, window); - if (processWindow) { - lock.notifyAll(); - } - } - - } - - private boolean processSessionEvent(SessionEvent event) { - switch (event.getType()) { - case SessionStatusEvent.TYPE: { - SessionStatusEvent statusEvent = (SessionStatusEvent) event; - if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { - connect(statusEvent); - } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { - FlowContext flowContext = flowContext(statusEvent); - if (flowContext != null) { - if (flowContext.state() != FlowContext.STATE_REQ_SENT) { - close(statusEvent); - } - } - } - - return true; - } - case HttpSessionPayloadEvent.TYPE: { - SessionEvent sessEvent = event; - FlowContext flowContext = flowContext(sessEvent); - if (flowContext != null) { - switch (flowContext.state()) { - case FlowContext.STATE_CONNECTING: - case FlowContext.STATE_REQ_SENT: - return false; - case FlowContext.STATE_CONNECTED: - case FlowContext.STATE_RESP_RECEIVED: - case FlowContext.STATE_ERROR: - if (send(flowContext, (HttpSessionPayloadEvent) event)) { - return true; - } - case FlowContext.STATE_DISCONNECTING: - case FlowContext.STATE_DISCONNECTED: - if (connectPartialSession) { - connect(sessEvent); - } else { - return true; - } - break; - } - } else if (connectPartialSession) { - connect(sessEvent); - } - - return true; - } - case HttpResponseEvent.TYPE: { - SessionEvent sessEvent = event; - FlowContext flowContext = flowContext(sessEvent); - if (flowContext != null) { - return (flowContext.state() == FlowContext.STATE_RESP_RECEIVED - || flowContext.state() >= FlowContext.STATE_DISCONNECTING); - } - - return true; - } - } - - return false; - } - - @Override - public void run() { - synchronized (lock) { - working = true; - while (working) { - try { - lock.wait(); - - boolean dataLoopEnd = false; - boolean dataEnd = false; - - for (; ; ) { - Iterator<Task> it = currentWindow.tasks.iterator(); - while (it.hasNext()) { - Task task = it.next(); - switch (task.type()) { - case SessionEventsTask.TYPE: - SessionEventsTask sessionTask = (SessionEventsTask) task; - if (isBlockedSession(sessionTask.session)) { - sessionTask.events.clear(); - it.remove(); - return; - } - - if (!sessionTask.events.isEmpty()) { - Event event = sessionTask.events.get(0); - if (processSessionEvent((SessionEvent) event)) { - sessionTask.events.remove(0); - } - } - - if (sessionTask.events.isEmpty()) { - it.remove(); - } - - break; - case DataLoopEndTask.TYPE: - dataLoopEnd = true; - it.remove(); - break; - case DataEndTask.TYPE: - dataEnd = true; - it.remove(); - break; - } - } - - if (currentWindow.tasks.isEmpty()) { - break; - } else if (!flowStateChanged) { - waitQuietly(); - } else { - flowStateChanged = false; - } - } - - if (dataLoopEnd) { - closeAllConnections(); - } - - if (dataEnd) { - working = false; - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } finally { - removeWindow(currentWindow); - processWindow = false; - synchronized (readerLock) { - readerLock.notifyAll(); - } - } - } - } - } - - private static final class HttpResponseEvent extends SessionEvent { - - public static final int TYPE = 1012; - - private final HttpResponse payload; - - public HttpResponseEvent(SessionInfo sessionInfo, String sourceName, HttpResponse payload, long timestamp) { - super(sessionInfo, sourceName); - this.payload = payload; - setTimestamp(timestamp); - } - - public HttpResponse getPayload() { - return payload; - } - - @Override - public SessionEvent instanceForWorker(int index) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public int getType() { - return TYPE; - } - - } - - private static interface Task { - - public int type(); - - } - - private static final class SessionEventsTask implements Task { - - public static final int TYPE = 1; - - private final SessionInfo session; - - private final List<SessionEvent> events = new LinkedList<>(); - - public SessionEventsTask(SessionInfo session) { - this.session = session; - } - - @Override - public int type() { - return TYPE; - } - - } - - private static final class DataLoopEndTask implements Task { - - public static final DataLoopEndTask INSTANCE = new DataLoopEndTask(); - - public static final int TYPE = 2; - - @Override - public int type() { - return TYPE; - } - - } - - private static final class DataEndTask implements Task { - - public static final DataEndTask INSTANCE = new DataEndTask(); - - public static final int TYPE = 3; - - @Override - public int type() { - return TYPE; - } - - } - - private static final class TasksTimeWindow { - - private final long startTime; - - private final long endTime; - - private final List<Task> tasks = new LinkedList<>(); - - private TasksTimeWindow(long time, long period) { - int factor = (int) (time / period); - startTime = factor * period; - endTime = (factor + 1) * period; - } - - public long startTime() { - return startTime; - } - - public long endTime() { - return endTime; - } - - public boolean inTimeRange(long time) { - return (startTime <= time && time < endTime); - } - - public void add(Task value) { - tasks.add(value); - } - - private SessionEventsTask createSessionEventsTask(SessionInfo session) { - SessionEventsTask task = new SessionEventsTask(session); - tasks.add(task); - return task; - } - - public SessionEventsTask getSessionEventsTask(SessionInfo session, boolean create) { - if (tasks.isEmpty()) { - if (create) { - return createSessionEventsTask(session); - } - - return null; - } - - for (Task task : tasks) { - if (task.type() == SessionEventsTask.TYPE) { - SessionEventsTask sessionTask = (SessionEventsTask) task; - if (sessionTask.session.equals(session)) { - return sessionTask; - } - } - } - - if (create) { - return createSessionEventsTask(session); - } - - return null; - } - - public List<Task> values() { - return tasks; - } - - public long getPeroid() { - return (endTime - startTime); - } - - @Override - public String toString() { - return "TimeWindow{" - + "startTimestamp=" + startTime - + ", endTimestamp=" + endTime + '}'; - } - - @Override - public int hashCode() { - int hash = 5; - hash = 17 * hash + (int) (this.startTime ^ (this.startTime >>> 32)); - hash = 17 * hash + (int) (this.endTime ^ (this.endTime >>> 32)); - return hash; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } else if (!(obj instanceof TasksTimeWindow)) { - return false; - } - - final TasksTimeWindow other = (TasksTimeWindow) obj; - return this.startTime == other.startTime - && this.endTime == other.endTime; - } - - } -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java Wed May 29 15:03:59 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java Thu May 30 10:26:34 2019 +0200 @@ -273,9 +273,9 @@ worker.setListeners(listeners); worker.setFilterChain(filterChain.instanceForWorker(i)); - if (worker instanceof HttpFlowBasedClientWorker) { - ((HttpFlowBasedClientWorker) worker).setSleepFactor(sleepFactor); - } +// if (worker instanceof HttpFlowBasedClientWorker) { +// ((HttpFlowBasedClientWorker) worker).setSleepFactor(sleepFactor); +// } worker.setCollectMetrics(collectMetrics); worker.setConnectPartialSession(connectPartialSession);
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java Wed May 29 15:03:59 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,683 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.commons.Assert; -import com.passus.commons.time.TimeAware; -import com.passus.commons.time.TimeGenerator; -import com.passus.data.ByteBuff; -import com.passus.data.DataDecoder; -import com.passus.data.HeapByteBuff; -import com.passus.net.http.HttpFullMessageDecoder; -import com.passus.net.http.HttpRequest; -import com.passus.net.http.HttpRequestEncoder; -import com.passus.net.http.HttpResponse; -import com.passus.st.client.Event; -import com.passus.st.client.FlowContext; -import com.passus.st.client.SessionEvent; -import com.passus.st.client.http.filter.HttpFilter; -import com.passus.st.client.http.filter.HttpFlowUtils; -import com.passus.st.emitter.ChannelContext; -import com.passus.st.emitter.Emitter; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.metric.MetricsContainer; -import org.apache.logging.log4j.Level; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import static com.passus.st.client.FlowContext.*; -import static com.passus.st.client.FlowUtils.DEFAULT_TIMEOUTS; -import static com.passus.st.client.http.HttpConsts.*; -import static com.passus.st.client.http.filter.HttpFlowUtils.extractHttpContext; - -/** - * @author Mirosław Hawrot - */ -@Deprecated -public abstract class HttpFlowBasedClientWorker extends HttpClientWorker implements TimeAware { - - protected final Map<SessionInfo, FlowContext> sessions = new ConcurrentHashMap<>(); - - private final Set<SessionInfo> blockedSessions = new HashSet<>(); - - private final Map<Integer, Long> timeouts = new HashMap<>(); - - private final HttpRequestEncoder reqEncoder = new HttpRequestEncoder(); - - protected final Object lock = new Object(); - - protected volatile boolean working = false; - - private long checkTimeoutsPeriod = 5_000; - - private long nextCheckTimeoutsTime = -1; - - private float sleepFactor = SLEEP_FACTOR_NO_SLEEP; - - private long lastEventTimestamp = -1; - - protected final HttpScopes scopes = new HttpScopes(); - - protected TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); - - public HttpFlowBasedClientWorker(Emitter emitter, String name, int index) { - super(emitter, name, index); - timeouts.putAll(DEFAULT_TIMEOUTS); - } - - @Override - public boolean isWorking() { - return working; - } - - @Override - public int activeConnections() { - int count = 0; - synchronized (lock) { - for (FlowContext flowContext : sessions.values()) { - if (flowContext.state() != STATE_DISCONNECTED) { - count++; - } - } - } - - return count; - } - - @Override - public TimeGenerator getTimeGenerator() { - return timeGenerator; - } - - @Override - public void setTimeGenerator(TimeGenerator timeGenerator) { - Assert.notNull(timeGenerator, "timeGenerator"); - this.timeGenerator = timeGenerator; - } - - protected final void addBlockedSession(SessionInfo session) { - blockedSessions.add(session); - } - - protected final boolean isBlockedSession(SessionInfo session) { - return !blockedSessions.isEmpty() && blockedSessions.contains(session); - } - - public float getSleepFactor() { - return sleepFactor; - } - - public void setSleepFactor(float sleepFactor) { - Assert.greaterOrEqualZero(sleepFactor, "sleepFactor"); - this.sleepFactor = sleepFactor; - } - - public long getCheckTimeoutsPeriod() { - return checkTimeoutsPeriod; - } - - public void setCheckTimeoutsPeriod(long checkTimeoutsPeriod) { - Assert.greaterThanZero(checkTimeoutsPeriod, "checkTimeoutsPeriod"); - this.checkTimeoutsPeriod = checkTimeoutsPeriod; - } - - protected final void changeFlowState(FlowContext flowContext, int state) { - try { - if (flowContext.state() == state) { - return; - } - - int oldState = flowContext.state(); - if (logger.isDebugEnabled()) { - debug(flowContext, "Flow status changing {} -> {}.", - contextStateToString(flowContext.state()), - contextStateToString(state) - ); - } - - switch (state) { - case FlowContext.STATE_CONNECTING: - flowContext.clear(); - break; - case FlowContext.STATE_CONNECTED: - HttpFullMessageDecoder decoder = new HttpFullMessageDecoder(); - decoder.setDecodeRequest(false); - - flowContext.decoder(new HttpFullMessageDecoder()); - flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY)); - break; - case FlowContext.STATE_ERROR: - changeFlowState(flowContext, STATE_DISCONNECTED); - break; - case FlowContext.STATE_RESP_RECEIVED: - flowContext.sentEvent(null); - flowContext.receivedStartTimestamp(-1); - break; - case FlowContext.STATE_DISCONNECTING: - if (flowContext.state() < FlowContext.STATE_DISCONNECTING) { - if (flowContext.channelContext() != null) { - try { - flowContext.channelContext().close(); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } - } else { - changeFlowState(flowContext, STATE_DISCONNECTED); - } - } else { - return; - } - break; - case STATE_DISCONNECTED: - flowContext.sentEvent(null); - flowContext.state(STATE_DISCONNECTED); - flowContext.timeout(-1); - flowContext.clear(); - removeFlowContext(flowContext); - flowStateChanged(flowContext, oldState); - return; - } - - long timeout = timeouts.get(flowContext.state()); - flowContext.timeout(timeGenerator.currentTimeMillis() + timeout); - flowContext.state(state); - flowStateChanged(flowContext, oldState); - } catch (Exception e) { - logger.debug(e.getMessage(), e); - } - } - - protected void flowStateChanged(FlowContext context, int oldState) { - - } - - protected FlowContext flowContext(SessionEvent event) { - return flowContext(event.getSessionInfo()); - } - - protected FlowContext flowContext(ChannelContext context) { - return flowContext(context.getSessionInfo()); - } - - protected FlowContext flowContext(SessionInfo session) { - FlowContext context = sessions.get(session); - if (context == null) { - if (logger.isDebugEnabled()) { - logger.debug("Context for session '" + session + "' not found."); - } - } - - return context; - } - - protected FlowContext createFlowContext(SessionInfo session) { - return HttpFlowUtils.createFlowContext(session, scopes); - } - - protected FlowContext register(SessionEvent sessionEvent) { - return register(sessionEvent.getSessionInfo()); - } - - protected FlowContext register(SessionInfo session) { - synchronized (lock) { - if (sessions.containsKey(session)) { - logger.warn("Unable to register session '" + session + "'. Session already registered."); - return null; - } - - FlowContext flowContext = createFlowContext(session); - sessions.put(session, flowContext); - return flowContext; - } - } - - protected FlowContext connect(SessionEvent sessionEvent) { - return connect(sessionEvent.getSessionInfo()); - } - - protected FlowContext connect(SessionInfo session) { - synchronized (lock) { - try { - FlowContext flowContext = register(session); - if (flowContext != null) { - emitter.connect(session, this, index); - return flowContext; - } - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - return null; - } - } - - @Override - public void close() { - synchronized (lock) { - for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) { - FlowContext flowContext = entry.getValue(); - try { - closeSession(flowContext); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - debug(flowContext, e.getMessage(), e); - } - } - } - - sessions.clear(); - working = false; - } - } - - protected void close(SessionEvent sessionEvent) { - close(sessionEvent.getSessionInfo()); - } - - protected void close(FlowContext flowContext) { - synchronized (lock) { - try { - closeSession(flowContext); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } - } - } - - @Override - public void close(SessionInfo session) { - synchronized (lock) { - try { - FlowContext flowContext = flowContext(session); - closeSession(flowContext); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } - } - } - - protected void closeSession(FlowContext flowContext) { - synchronized (lock) { - if (flowContext != null) { - changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING); - } - } - } - - protected void removeFlowContext(FlowContext flowContext) { - synchronized (lock) { - debug(flowContext, "removeFlowContext"); - sessions.remove(flowContext.sessionInfo()); - } - } - - protected void reconnect(FlowContext flowContext) { - synchronized (lock) { - try { - if (logger.isDebugEnabled()) { - debug(flowContext, "Reconnect (state: {}).", contextStateToString(flowContext.state())); - } - - SessionInfo session = flowContext.sessionInfo(); - changeFlowState(flowContext, FlowContext.STATE_CONNECTING); - emitter.connect(session, this, index); - } catch (Exception e) { - error(flowContext, e.getMessage(), e); - } - } - } - - protected void closeAllConnections() { - synchronized (lock) { - for (FlowContext flowContext : sessions.values()) { - closeSession(flowContext); - } - } - } - - private void sleepSilently(long millis) { - if (millis == 0) { - return; - } - if (millis < 0) { - logger.warn("Cannot sleep for negative interval: {}."); - return; - } - logger.debug("Going sleep for: {}.", millis); - try { - Thread.sleep(millis); - } catch (InterruptedException ignore) { - } - } - - protected void sleep(Event event) { - if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) { - if (lastEventTimestamp != -1) { - long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor); - sleepSilently(timeToSleep); - } - lastEventTimestamp = event.getTimestamp(); - } - } - - @Override - public void writeMetrics(MetricsContainer container) { - synchronized (lock) { - super.writeMetrics(container); - } - } - - @Override - public void channelActive(ChannelContext context) throws Exception { - synchronized (lock) { - FlowContext flowContext = flowContext(context); - if (flowContext != null) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})", - context.getLocalAddress(), - context.getRemoteAddress()); - } - - flowContext.channelContext(context); - changeFlowState(flowContext, STATE_CONNECTED); - } - - lock.notifyAll(); - } - } - - @Override - public void channelInactive(ChannelContext context) throws Exception { - synchronized (lock) { - FlowContext flowContext = flowContext(context); - if (flowContext != null) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Channel inactive."); - } - - changeFlowState(flowContext, STATE_DISCONNECTED); - } - lock.notifyAll(); - } - } - - @Override - public void dataReceived(ChannelContext context, ByteBuff data) throws Exception { - synchronized (lock) { - FlowContext flowContext = flowContext(context); - try { - if (flowContext != null) { - HttpFullMessageDecoder decoder = (HttpFullMessageDecoder) flowContext.decoder(); - HttpRequest req = null; - if (flowContext.sentEvent() != null) { - req = ((HttpSessionPayloadEvent) flowContext.sentEvent()).getRequest(); - } - - if (req != null) { - decoder.setRequestMethod(req.getMethod()); - } - - decoder.decode(data); - long now = timeGenerator.currentTimeMillis(); - if (flowContext.receivedStartTimestamp() == -1) { - flowContext.receivedStartTimestamp(now); - } - - if (decoder.state() == DataDecoder.STATE_ERROR) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Decoder error. " + decoder.getLastError()); - } - - decoder.clear(); - if (req != null) { - extractHttpContext(flowContext).scopes().removeConversation(req); - } - - changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED); - } else if (decoder.state() == DataDecoder.STATE_FINISHED) { - HttpResponse resp = (HttpResponse) decoder.getResult(); - - if (logger.isDebugEnabled()) { - debug(flowContext, - "Response decoded (size: {} B, downloaded: {} ms, status: {})", - decoder.getHeaderSize() + decoder.getContentSize(), - now - flowContext.receivedStartTimestamp(), - resp.getStatus().getCode() - ); - } - - if (collectMetric) { - synchronized (metric) { - metric.incResponsesNum(); - metric.addResponseStatusCode(resp.getStatus().getCode()); - metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize()); - metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp()); - if (req != null) { - metric.addResponseTime(now - (long) req.getTag(TAG_TIME_START)); - } - } - } - - resp.setTag(TAG_HEADER_SIZE, decoder.getHeaderSize()); - resp.setTag(TAG_CONTENT_SIZE, decoder.getContentSize()); - resp.setTag(TAG_TIME_START, flowContext.receivedStartTimestamp()); - resp.setTag(TAG_TIME_END, now); - - if (filterChain.filterInbound(req, resp, flowContext) != HttpFilter.DENY) { - try { - fireResponseReceived(req, resp, flowContext); - } catch (Exception e) { - error(flowContext, e.getMessage(), e); - } - } - - if (req != null) { - extractHttpContext(flowContext).scopes().removeConversation(req); - } - decoder.clear(); - changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED); - } - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - debug(flowContext, e.getMessage(), e); - } - } - - lock.notifyAll(); - } - } - - @Override - public void dataWriteStart(ChannelContext context) { - synchronized (lock) { - FlowContext flowContext = flowContext(context); - if (flowContext != null && flowContext.sentEvent() != null) { - long now = timeGenerator.currentTimeMillis(); - flowContext.sendStartTimestamp(now); - ((HttpSessionPayloadEvent) flowContext.sentEvent()).getRequest().setTag(TAG_TIME_START, now); - } - } - } - - @Override - public void dataWritten(ChannelContext context) throws Exception { - synchronized (lock) { - FlowContext flowContext = flowContext(context); - if (flowContext != null && flowContext.sentEvent() != null) { - long now = timeGenerator.currentTimeMillis(); - if (collectMetric) { - synchronized (metric) { - metric.addRequestSendingTime(now - flowContext.sendStartTimestamp()); - } - } - - ((HttpSessionPayloadEvent) (flowContext.sentEvent())).getRequest().setTag(TAG_TIME_END, now); - } - - lock.notifyAll(); - - } - } - - @Override - public void errorOccurred(ChannelContext context, Throwable cause) throws Exception { - if (logger.isDebugEnabled()) { - logger.debug("Error occured. " + cause.getMessage(), cause); - } - - synchronized (lock) { - FlowContext flowContext = flowContext(context); - if (flowContext != null) { - changeFlowState(flowContext, FlowContext.STATE_ERROR); - } - - lock.notifyAll(); - } - } - - protected boolean send(FlowContext context, HttpSessionPayloadEvent event) { - synchronized (lock) { - if (event.getRequest() != null) { - HttpRequest req = event.getRequest(); - if (filterChain.filterOutbound(req, event.getResponse(), context) == HttpFilter.DENY) { - return false; - } - - reqEncoder.encodeHeaders(req, context.buffer()); - long headerSize = context.buffer().readableBytes(); - reqEncoder.encodeContent(req, context.buffer()); - - req.setTag(TAG_HEADER_SIZE, headerSize); - req.setTag(TAG_CONTENT_SIZE, context.buffer().readableBytes() - headerSize); - - if (collectMetric) { - synchronized (metric) { - metric.incRequestsNum(); - metric.addRequestSize(context.buffer().readableBytes()); - //metric.addRequestUrl(req.getUrl()); - } - } - - try { - changeFlowState(context, FlowContext.STATE_REQ_SENT); - context.sentEvent(event); - - context.channelContext().writeAndFlush(context.buffer()); - if (logger.isDebugEnabled()) { - debug(context, "Request '{}' sending ({} bytes).", req.getUrl(), context.buffer().length()); - } - - context.buffer().clear(); - - return true; - } catch (Exception e) { - if (logger.isDebugEnabled()) { - debug(context, e.getMessage(), e); - } - } - } - } - - return false; - } - - protected void processTimeouts() { - synchronized (lock) { - try { - long now = timeGenerator.currentTimeMillis(); - if (nextCheckTimeoutsTime == -1) { - nextCheckTimeoutsTime = now + checkTimeoutsPeriod; - } else if (nextCheckTimeoutsTime > now) { - nextCheckTimeoutsTime = now + checkTimeoutsPeriod; - for (FlowContext flowContext : sessions.values()) { - if (flowContext.timeouted()) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Flow for session '{}' timed out (state '{}').", - flowContext.sessionInfo(), - contextStateToString(flowContext.state())); - } - - switch (flowContext.state()) { - case FlowContext.STATE_CONNECTING: - case FlowContext.STATE_CONNECTED: - case FlowContext.STATE_REQ_SENT: - case FlowContext.STATE_ERROR: - closeSession(flowContext); - break; - case FlowContext.STATE_RESP_RECEIVED: - //Dziwny blad nie powinien wystepowac - break; - case FlowContext.STATE_DISCONNECTING: - case STATE_DISCONNECTED: - removeFlowContext(flowContext); - break; - } - } - } - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } - } - } - - protected Event eventInstanceForWorker(Event event) { - if (event instanceof SessionEvent) { - Event newEvent = ((SessionEvent) event).instanceForWorker(index); - newEvent.setTimestamp(event.getTimestamp()); - return newEvent; - } else { - return event; - } - } - - protected final void trace(FlowContext flowContext, String message, Object... args) { - log(flowContext, Level.TRACE, message, args); - } - - protected final void debug(FlowContext flowContext, String message, Throwable cause) { - log(flowContext, Level.DEBUG, message, cause); - } - - protected final void error(FlowContext flowContext, String message, Throwable cause) { - log(flowContext, Level.ERROR, message, cause); - } - - protected final void log(FlowContext flowContext, Level level, String message, Throwable cause) { - message = String.format("%s [%s]", message, flowContext.sessionInfo()); - logger.log(level, message, cause); - } - - protected final void debug(FlowContext flowContext, String message, Object... args) { - log(flowContext, Level.DEBUG, message, args); - } - - protected final void log(FlowContext flowContext, Level level, String message, Object... args) { - if (args.length > 0) { - message = String.format(message, args); - } - - SessionInfo session = flowContext.sessionInfo(); - if (args.length == 0) { - logger.log(level, message + " [{}]", session); - } else { - Object[] logArgs = new Object[args.length + 1]; - for (int i = 0; i < args.length; i++) { - logArgs[i] = args[i]; - } - - logArgs[logArgs.length - 1] = session; - logger.log(level, message + " [{}]", logArgs); - } - } -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpNullClientWorker.java Wed May 29 15:03:59 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,48 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.st.client.DataEvents; -import com.passus.st.client.Event; -import com.passus.st.emitter.Emitter; -import com.passus.st.emitter.SessionInfo; - -/** - * - * @author Mirosław Hawrot - */ -@Deprecated -public class HttpNullClientWorker extends HttpClientWorker { - - private boolean working = true; - - public HttpNullClientWorker(Emitter emitter, String name, int index) { - super(emitter, name, index); - } - - @Override - public boolean isWorking() { - return working; - } - - @Override - public int activeConnections() { - return 0; - } - - @Override - public void close() { - working = false; - } - - @Override - public void close(SessionInfo session) { - - } - - @Override - public void handle(Event event) { - if (event.getType() == DataEvents.DataLoopEnd.TYPE) { - working = false; - } - } - -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java Wed May 29 15:03:59 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,319 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.commons.Assert; -import com.passus.commons.annotations.Plugin; -import com.passus.st.client.DataEvents.DataEnd; -import com.passus.st.client.DataEvents.DataLoopEnd; -import com.passus.st.client.Event; -import com.passus.st.client.FlowContext; -import com.passus.st.client.SessionEvent; -import com.passus.st.client.SessionStatusEvent; -import com.passus.st.emitter.ChannelContext; -import com.passus.st.emitter.Emitter; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.plugin.PluginConstants; - -import java.util.*; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.Semaphore; - -/** - * @author Mirosław Hawrot - */ -@Deprecated -@Plugin(name = HttpParallelClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER) -public final class HttpParallelClientWorker extends HttpFlowBasedClientWorker { - - public static final String TYPE = "parallel"; - - public static final int DEFAULT_MAX_SENT_REQUESTS = 10; - - private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>(); - - private int maxSentRequests = DEFAULT_MAX_SENT_REQUESTS; - - private long eventsQueueWaitTime = 100; - - private final Semaphore semaphore = new Semaphore(maxSentRequests); - - private final Deque<LocalFlowContext> flowIndex = new ArrayDeque<>(); - - private boolean closeAllConnections = false; - - public HttpParallelClientWorker(Emitter emitter, String name, int index) { - super(emitter, name, index); - } - - public int getMaxSentRequests() { - return maxSentRequests; - } - - public void setMaxSentRequests(int maxSentRequests) { - Assert.greaterThanZero(maxSentRequests, "maxSentRequests"); - this.maxSentRequests = maxSentRequests; - } - - @Override - protected LocalFlowContext flowContext(SessionInfo session) { - return (LocalFlowContext) super.flowContext(session); - } - - @Override - protected LocalFlowContext flowContext(ChannelContext context) { - return flowContext(context.getSessionInfo()); - } - - @Override - protected LocalFlowContext flowContext(SessionEvent event) { - return flowContext(event.getSessionInfo()); - } - - public long getEventsQueueWaitTime() { - return eventsQueueWaitTime; - } - - public void setEventsQueueWaitTime(long eventsQueueWaitTime) { - Assert.greaterThanZero(eventsQueueWaitTime, "eventsQueueWaitTime"); - this.eventsQueueWaitTime = eventsQueueWaitTime; - } - - @Override - protected void removeFlowContext(FlowContext flowContext) { - if (flowContext != null) { - flowIndex.remove(flowContext); - } - } - - @Override - protected LocalFlowContext createFlowContext(SessionInfo session) { - LocalFlowContext flowContext = new LocalFlowContext(session, scopes); - flowIndex.add(flowContext); - return flowContext; - } - - private void waitCloseAllConnections() { - closeAllConnections = true; - synchronized (lock) { - while (!flowIndex.isEmpty()) { - try { - lock.wait(10); - } catch (InterruptedException ignore) { - } - } - } - - closeAllConnections = false; - } - - @Override - protected boolean send(FlowContext flowContext, HttpSessionPayloadEvent event) { - //Sprawdzamy, czy polaczen nie jest za duzo. Jezeli jest, to zamykamy - //najmniej uzywane. - if (flowIndex.size() > maxSentRequests) { - int diff = flowIndex.size() - maxSentRequests; - if (logger.isDebugEnabled()) { - debug(flowContext, "Too many connections {}.", flowIndex.size()); - } - - Iterator<LocalFlowContext> it = flowIndex.descendingIterator(); - while (it.hasNext()) { - LocalFlowContext indexFlowContext = it.next(); - if (indexFlowContext.eventsQueue.isEmpty() - && indexFlowContext.state() != FlowContext.STATE_REQ_SENT) { - close(flowContext); - if (--diff == 0) { - break; - } - } - } - } - - return super.send(flowContext, event); - } - - private boolean canSend(FlowContext flowContext) { - int state = flowContext.state(); - return (state == FlowContext.STATE_CONNECTED - || state == FlowContext.STATE_RESP_RECEIVED - || state == FlowContext.STATE_ERROR - || state == FlowContext.STATE_REQ_SENT); - } - - @Override - protected void flowStateChanged(FlowContext flowContext, int oldState) { - LocalFlowContext localFlowContext = (LocalFlowContext) flowContext; - if (oldState == FlowContext.STATE_REQ_SENT) { - if (semaphore.availablePermits() <= maxSentRequests) { - semaphore.release(); - } - } - - if (closeAllConnections) { - if (localFlowContext.state() < FlowContext.STATE_DISCONNECTING - && localFlowContext.state() != FlowContext.STATE_REQ_SENT - && localFlowContext.eventsQueue.isEmpty()) { - close(flowContext); - return; - } - } - - if (localFlowContext.state() >= FlowContext.STATE_CONNECTED - && localFlowContext.state() < FlowContext.STATE_DISCONNECTING - && localFlowContext.state() != FlowContext.STATE_REQ_SENT - && !localFlowContext.eventsQueue.isEmpty()) { - - Event event = localFlowContext.eventsQueue.peek(); - if (event.getType() == SessionStatusEvent.TYPE) { - SessionStatusEvent statusEvent = (SessionStatusEvent) event; - if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { - localFlowContext.eventsQueue.poll(); - close((SessionStatusEvent) event); - } - } else if (event.getType() == HttpSessionPayloadEvent.TYPE - && canSend(flowContext)) { - localFlowContext.eventsQueue.poll(); - send(flowContext, (HttpSessionPayloadEvent) event); - } else { - localFlowContext.eventsQueue.poll(); - } - } - } - - private void makeFirst(LocalFlowContext flowContext) { - synchronized (lock) { - flowIndex.remove(flowContext); - flowIndex.addFirst(flowContext); - } - } - - private void addToQueue(LocalFlowContext flowContext, Event event) { - flowContext.eventsQueue.add(event); - makeFirst(flowContext); - } - - @Override - public void handle(Event event) { - Event newEvent = null; - switch (event.getType()) { - case HttpSessionPayloadEvent.TYPE: - semaphore.acquireUninterruptibly(); - newEvent = eventInstanceForWorker(event); - break; - case SessionStatusEvent.TYPE: - case DataLoopEnd.TYPE: - case DataEnd.TYPE: - newEvent = event; - } - - if (newEvent != null) { - synchronized (lock) { - try { - eventsQueue.put(newEvent); - lock.notifyAll(); - } catch (Exception e) { - logger.debug("Unable to add event to queue. " + e.getMessage(), e); - } - } - } - } - - private void processEvent(Event event) { - if (event instanceof SessionEvent) { - switch (event.getType()) { - case SessionStatusEvent.TYPE: - SessionStatusEvent statusEvent = (SessionStatusEvent) event; - if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { - LocalFlowContext flowContext = flowContext((SessionEvent) event); - if (flowContext != null) { - if (flowContext.eventsQueue.isEmpty() - && flowContext.state() != FlowContext.STATE_REQ_SENT) { - close(statusEvent); - } else { - addToQueue(flowContext, event); - } - } - } - break; - case HttpSessionPayloadEvent.TYPE: { - HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event; - LocalFlowContext flowContext = flowContext(payloadEvent); - if (flowContext != null) { - if (flowContext.state() >= FlowContext.STATE_CONNECTING - && flowContext.state() < FlowContext.STATE_DISCONNECTING) { - if (flowContext.eventsQueue.isEmpty() - && (flowContext.state() == FlowContext.STATE_CONNECTED - || flowContext.state() == FlowContext.STATE_ERROR - || flowContext.state() == FlowContext.STATE_RESP_RECEIVED)) { - send(flowContext, payloadEvent); - } else { - addToQueue(flowContext, event); - } - } - } else { - try { - SessionInfo session = payloadEvent.getSessionInfo(); - flowContext = (LocalFlowContext) register(session); - addToQueue(flowContext, event); - emitter.connect(session, this, index); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - } - - break; - } - } - - } else if (event.getType() == DataLoopEnd.TYPE) { - if (logger.isDebugEnabled()) { - logger.debug("DataLoopEnd received."); - } - - waitCloseAllConnections(); - filterChain.reset(); - } else if (event.getType() == DataEnd.TYPE) { - if (logger.isDebugEnabled()) { - logger.debug("DataEnd received. Deactivation."); - } - - working = false; - } - - } - - @Override - public void run() { - synchronized (lock) { - working = true; - while (working) { - try { - try { - lock.wait(eventsQueueWaitTime); - } catch (InterruptedException ignore) { - } - - Event event; - while ((event = eventsQueue.poll()) != null) { - processEvent(event); - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } - } - } - } - - protected static class LocalFlowContext extends FlowContext { - - private final Queue<Event> eventsQueue; - - private LocalFlowContext(SessionInfo session, HttpScopes scopes) { - super(session); - setParam(HttpFlowConst.PARAM_HTTP_CONTEXT, new HttpFlowContext(this, scopes)); - eventsQueue = new LinkedList<>(); - } - - } -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java Wed May 29 15:03:59 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,51 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.net.http.HttpMessageHelper; -import com.passus.net.http.HttpRequest; -import com.passus.net.http.HttpResponse; -import com.passus.st.client.SessionPayloadEvent; -import com.passus.st.emitter.SessionInfo; - -import static com.passus.st.Protocols.HTTP; - -/** - * @author Mirosław Hawrot - */ -@Deprecated -public class HttpSessionPayloadEvent extends SessionPayloadEvent<HttpRequest, HttpResponse> { - - public static final int TYPE = 12; - - public HttpSessionPayloadEvent(SessionInfo sessionInfo, HttpRequest req, HttpResponse resp, String sourceName) { - super(sessionInfo, req, resp, HTTP, sourceName); - } - - @Override - public int getType() { - return TYPE; - } - - @Override - public String toString() { - String req = null; - String resp = null; - HttpMessageHelper helper = HttpMessageHelper.get(); - - if (getRequest() != null) { - req = helper.firstLineToString(getRequest()); - } - - if (getResponse() != null) { - resp = helper.firstLineToString(getResponse()); - } - - StringBuilder sb = new StringBuilder(); - sb.append("HttpSessionPayloadEvent{") - .append("timestamp=").append(getTimestamp()).append(", ") - .append("session=").append(getSessionInfo()).append(", ") - .append("request=").append(req).append(", ") - .append("response=").append(resp).append("}"); - return sb.toString(); - } - -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpStresserWorker.java Wed May 29 15:03:59 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,74 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.net.http.HttpHeaders; -import com.passus.net.http.HttpRequest; -import com.passus.st.client.DataEvents; -import com.passus.st.client.Event; -import com.passus.st.client.SessionPayloadEvent; -import com.passus.st.emitter.Emitter; -import com.passus.st.emitter.SessionInfo; - -/** - * @author Mirosław Hawrot - */ -@Deprecated -public class HttpStresserWorker extends HttpClientWorker { - - private int maxRequests = -1; - - private int sentRequests = 0; - - private boolean working = true; - - public HttpStresserWorker(Emitter emitter, String name, int index) { - super(emitter, name, index); - } - - @Override - public boolean isWorking() { - return working; - } - - @Override - public int activeConnections() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public void close() { - - } - - private void send(HttpRequest req) { - req.getHeaders().delete(HttpHeaders.KEEP_ALIVE); - - } - - @Override - public void close(SessionInfo session) { - throw new UnsupportedOperationException("Not supported yet."); - } - - private void waitAndClose() { - working = false; - } - - @Override - public void handle(Event event) { - if (event.getType() == DataEvents.DataLoopEnd.TYPE) { - waitAndClose(); - } else if (event.getType() != SessionPayloadEvent.TYPE) { - return; - } - - SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event; - HttpRequest req = (HttpRequest) payloadEvent.getRequest(); - send(req); - } - - @Override - public void run() { - - } - -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java Wed May 29 15:03:59 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,303 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.commons.Assert; -import com.passus.commons.annotations.Plugin; -import com.passus.st.client.DataEvents.DataEnd; -import com.passus.st.client.DataEvents.DataLoopEnd; -import com.passus.st.client.Event; -import com.passus.st.client.FlowContext; -import com.passus.st.client.SessionEvent; -import com.passus.st.client.SessionStatusEvent; -import com.passus.st.emitter.Emitter; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.plugin.PluginConstants; - -import java.io.IOException; -import java.util.concurrent.LinkedBlockingDeque; - -import static com.passus.st.client.FlowContext.contextStateToString; - -/** - * @author Mirosław Hawrot - */ -@Deprecated -@Plugin(name = HttpSynchClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER) -public class HttpSynchClientWorker extends HttpFlowBasedClientWorker { - - public static final String TYPE = "synch"; - - private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>(); - - private long eventsQueueWaitTime = 100; - - /** - * Context dla ktorego wykonywana jest operacja. - */ - private FlowContext currFlowContext; - - private boolean loopEnd = false; - - private int loop = 0; - - public HttpSynchClientWorker(Emitter emitter, String name, int index) { - super(emitter, name, index); - } - - @Override - public boolean isWorking() { - return working; - } - - public long getEventsQueueWaitTime() { - return eventsQueueWaitTime; - } - - public void setEventsQueueWaitTime(long eventsQueueWaitTime) { - Assert.greaterThanZero(eventsQueueWaitTime, "eventsQueueWaitTime"); - this.eventsQueueWaitTime = eventsQueueWaitTime; - } - - @Override - public void sessionInvalidated(SessionInfo session) throws Exception { - synchronized (lock) { - if (logger.isDebugEnabled()) { - logger.debug("Session {} invalidated.", session); - } - - FlowContext flowContext = flowContext(session); - if (flowContext != null) { - changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING); - } - - addBlockedSession(session); - lock.notifyAll(); - } - } - - @Override - protected void flowStateChanged(FlowContext context, int oldState) { - if (logger.isDebugEnabled()) { - logger.debug("flowStateChanged {},{}", context == currFlowContext, contextStateToString(context.state())); - } - - if (context == currFlowContext) { - if (context.state() == FlowContext.STATE_CONNECTED - || context.state() == FlowContext.STATE_RESP_RECEIVED - || context.state() == FlowContext.STATE_ERROR - || context.state() == FlowContext.STATE_DISCONNECTED) { - currFlowContext = null; - } - } - } - - @Override - public void handle(Event event) { - Event newEvent = eventInstanceForWorker(event); - synchronized (lock) { - try { - eventsQueue.put(newEvent); - } catch (Exception e) { - logger.debug("Unable to add event to queue. " + e.getMessage(), e); - } - - lock.notifyAll(); - } - } - - @Override - protected void closeAllConnections() { - synchronized (lock) { - boolean wait; - do { - wait = false; - for (FlowContext flowContext : sessions.values()) { - if (flowContext.state() == FlowContext.STATE_REQ_SENT) { - wait = true; - break; - } - } - - if (wait) { - try { - lock.wait(100); - } catch (Exception e) { - } - } - } while (wait); - - super.closeAllConnections(); - while (!sessions.isEmpty()) { - try { - lock.wait(100); - } catch (Exception e) { - } - } - } - } - - /** - * Returns true if next event should be processed immediately. - * - * @return boolean - */ - private boolean pollNext() { - if (currFlowContext != null) { - return false; - } - - Event event = eventsQueue.poll(); - if (event != null) { - sleep(event); - if (logger.isTraceEnabled()) { - logger.trace("Event processing: {}", event); - } - - if (event instanceof SessionEvent) { - SessionEvent sessEvent = (SessionEvent) event; - if (isBlockedSession(sessEvent.getSessionInfo())) { - return true; - } - - if (event.getType() == SessionStatusEvent.TYPE) { - SessionStatusEvent statusEvent = (SessionStatusEvent) sessEvent; - if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { - try { - currFlowContext = register(statusEvent); - if (currFlowContext != null) { - currFlowContext.loop(loop); - emitter.connect(statusEvent.getSessionInfo(), this, index); - } - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - - return (currFlowContext == null); - } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { - currFlowContext = flowContext((SessionEvent) event); - if (currFlowContext != null) { - if (currFlowContext.state() != FlowContext.STATE_REQ_SENT) { - close(statusEvent); - } - } - } - - return true; - } else if (event.getType() == HttpSessionPayloadEvent.TYPE) { - FlowContext flowContext = flowContext(sessEvent); - if (flowContext != null) { - switch (flowContext.state()) { - case FlowContext.STATE_CONNECTED: - case FlowContext.STATE_RESP_RECEIVED: - case FlowContext.STATE_ERROR: - currFlowContext = flowContext; - if (send(flowContext, (HttpSessionPayloadEvent) event)) { - return false; - } else { - currFlowContext = null; - return true; - } - case FlowContext.STATE_DISCONNECTING: - case FlowContext.STATE_DISCONNECTED: - if (connectPartialSession) { - currFlowContext = register(sessEvent); - if (currFlowContext != null) { - try { - currFlowContext.loop(loop); - emitter.connect(sessEvent.getSessionInfo(), this, index); - } catch (IOException e) { - logger.error(e.getMessage(), e); - currFlowContext = null; - } - } - - return false; - } else { - return true; - } - default: - return false; - } - } else if (connectPartialSession) { - currFlowContext = register(sessEvent); - if (currFlowContext != null) { - try { - currFlowContext.loop(loop); - emitter.connect(sessEvent.getSessionInfo(), this, index); - eventsQueue.addFirst(sessEvent); - } catch (IOException e) { - logger.error(e.getMessage(), e); - currFlowContext = null; - } - - return false; - } else { - return true; - } - } - - return true; - } else { - return true; - } - } else if (event.getType() == DataLoopEnd.TYPE) { - if (logger.isDebugEnabled()) { - logger.debug("DataLoopEnd received."); - } - - loopEnd = true; - closeAllConnections(); - filterChain.reset(); - loop = currFlowContext.loop() + 1; - loopEnd = false; - return true; - } else if (event.getType() == DataEnd.TYPE) { - if (logger.isDebugEnabled()) { - logger.debug("DataEnd received. Deactivation."); - } - - working = false; - } - } - - return false; - } - - @Override - public void close() { - synchronized (lock) { - eventsQueue.clear(); - super.close(); - working = false; - lock.notifyAll(); - } - } - - @Override - public void run() { - synchronized (lock) { - working = true; - while (working) { - try { - try { - lock.wait(eventsQueueWaitTime); - } catch (InterruptedException ignore) { - } - - boolean nextPoll; - do { - if (loopEnd || !working) { - break; - } - - nextPoll = pollNext(); - } while (nextPoll); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } - } - } - } - -}
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java Wed May 29 15:03:59 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java Thu May 30 10:26:34 2019 +0200 @@ -2,25 +2,15 @@ import com.passus.commons.Assert; import com.passus.data.*; -import com.passus.net.http.HttpCachedHeadersImpl; -import com.passus.net.http.HttpConsts; -import com.passus.net.http.HttpHeaders; -import com.passus.net.http.HttpHeadersDecoder; -import com.passus.net.http.HttpHeadersImpl; -import com.passus.net.http.HttpMessage; -import com.passus.net.http.HttpMethod; -import com.passus.net.http.HttpRequest; -import com.passus.net.http.HttpResponse; -import com.passus.net.http.HttpStatus; +import com.passus.net.http.*; +import com.passus.st.client.SessionPayloadEvent; import com.passus.st.client.http.HttpReqResp; -import com.passus.st.client.http.HttpSessionPayloadEvent; -import static com.passus.st.reader.nc.NcHttpDataUtils.CUSTOM_HEADER_CODE; -import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_REQUEST; -import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_RESPONSE; + import java.io.IOException; +import static com.passus.st.reader.nc.NcHttpDataUtils.*; + /** - * * @author Mirosław Hawrot */ public class HttpSessionPayloadEventDataReader { @@ -113,7 +103,7 @@ if (headerSize > 0) { int startIndex = buffer.startIndex(); - for (;;) { + for (; ; ) { byte headerCode = buffer.read(); ByteString headerName; if (headerCode == CUSTOM_HEADER_CODE) { @@ -226,7 +216,7 @@ return new HttpReqResp(req, resp); } - public HttpSessionPayloadEvent read(NcDataBlockReader reader) throws IOException { + public SessionPayloadEvent read(NcDataBlockReader reader) throws IOException { return null; }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/client/AsynchFlowWorkerTest.java Thu May 30 10:26:34 2019 +0200 @@ -0,0 +1,76 @@ +package com.passus.st.client; + +import com.passus.commons.service.ServiceUtils; +import com.passus.st.AbstractWireMockTest; +import com.passus.st.emitter.RuleBasedSessionMapper; +import com.passus.st.emitter.nio.NioEmitter; +import com.passus.st.utils.EventUtils; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Properties; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; + +public class AsynchFlowWorkerTest extends AbstractWireMockTest { + + private NioEmitter prepareEmitter(String mapperRule) throws Exception { + RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper(); + sessionMapper.addRule(mapperRule); + + NioEmitter emitter = new NioEmitter(); + emitter.setSessionMapper(sessionMapper); + return emitter; + } + + @BeforeMethod + public void beforeMethod() { + String content = "test"; + stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html")) + .willReturn(aResponse() + .withHeader("Content-Type", "text/plain") + .withHeader("Content-Length", "" + content.length()) + .withBody(content))); + } + + @Test(enabled = false) + public void testHandle() throws Exception { + Properties props = new Properties(); + props.put("allowPartialSession", "true"); + props.put("ports", "4214"); + List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); + assertEquals(4, events.size()); + + NioEmitter emitter = prepareEmitter("10.87.110.40:4214 -> " + HOST + ":" + port()); + emitter.start(); + + TestHttpClientListener listener = new TestHttpClientListener(); + + AsynchFlowWorker worker = new AsynchFlowWorker(emitter, "test", 0); + try { + worker.setListener(listener); + worker.start(); + + SessionEvent sessionEvent = (SessionEvent) events.get(0); + SessionStatusEvent statusEvent = new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED); + statusEvent.setTimestamp(sessionEvent.getTimestamp()); + worker.handle(statusEvent); + + events.forEach(worker::handle); + + worker.join(); + assertTrue(listener.size() > 0); + assertTrue(listener.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent); + TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listener.get(0); + String responseStr = event.getResponse().toString(); + assertTrue(responseStr.startsWith("HTTP/1.1 200 OK")); + assertTrue(responseStr.endsWith("test")); + } finally { + ServiceUtils.stopQuietly(emitter); + } + } + +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java Thu May 30 10:26:34 2019 +0200 @@ -0,0 +1,72 @@ +package com.passus.st.client; + +import com.passus.commons.service.ServiceUtils; +import com.passus.st.AbstractWireMockTest; +import com.passus.st.emitter.RuleBasedSessionMapper; +import com.passus.st.emitter.nio.NioEmitter; +import com.passus.st.utils.EventUtils; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Properties; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; + +public class ParallelFlowWorkerTest extends AbstractWireMockTest { + + private NioEmitter prepareEmitter(String mapperRule) throws Exception { + RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper(); + sessionMapper.addRule(mapperRule); + + NioEmitter emitter = new NioEmitter(); + emitter.setSessionMapper(sessionMapper); + return emitter; + } + + @BeforeMethod + public void beforeMethod() { + String content = "test"; + stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html")) + .willReturn(aResponse() + .withHeader("Content-Type", "text/plain") + .withHeader("Content-Length", "" + content.length()) + .withBody(content))); + } + + @Test + public void testHandle() throws Exception { + Properties props = new Properties(); + props.put("allowPartialSession", "true"); + props.put("ports", "4214"); + List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); + assertEquals(4, events.size()); + + NioEmitter emitter = prepareEmitter("10.87.110.40:4214 -> " + HOST + ":" + port()); + emitter.start(); + + TestHttpClientListener listner = new TestHttpClientListener(); + + ParallelFlowWorker worker = new ParallelFlowWorker(emitter, "test", 0); + try { + worker.setListener(listner); + worker.start(); + + events.forEach(worker::handle); + + worker.join(); + assertTrue(listner.size() > 0); + assertTrue(listner.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent); + TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listner.get(0); + String responseStr = event.getResponse().toString(); + assertTrue(responseStr.startsWith("HTTP/1.1 200 OK")); + assertTrue(responseStr.endsWith("test")); + } finally { + ServiceUtils.stopQuietly(emitter); + } + + } + +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Thu May 30 10:26:34 2019 +0200 @@ -0,0 +1,241 @@ +package com.passus.st.client; + +import com.passus.config.Configuration; +import com.passus.config.ConfigurationContext; +import com.passus.data.ByteBuff; +import com.passus.data.HeapByteBuff; +import com.passus.net.SocketAddress; +import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpResponse; +import com.passus.net.http.HttpResponseEncoder; +import com.passus.st.emitter.*; +import com.passus.st.metric.MetricsContainer; +import com.passus.st.utils.EventUtils; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.Queue; + +import static com.passus.st.utils.Assert.assertHttpClientEvents; +import static org.testng.AssertJUnit.assertEquals; + +public class SynchFlowWorkerTest { + + private final TestHttpClientListener listener = new TestHttpClientListener(); + + private static class LocalEmitter implements Emitter { + + private SessionMapper sessionMapper; + + private boolean started = false; + + @Override + public void setSessionMapper(SessionMapper sessionMapper) { + this.sessionMapper = sessionMapper; + } + + @Override + public SessionMapper getSessionMapper() { + return sessionMapper; + } + + @Override + public void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException { + LocalChannelContext channelContext = new LocalChannelContext(this, handler, null, session); + try { + handler.channelRegistered(channelContext); + handler.channelActive(channelContext); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + + protected void flush(LocalChannelContext channelContext) { + SessionInfo sessionInfo = channelContext.getSessionInfo(); + SynchFlowWorker clientWorker = (SynchFlowWorker) channelContext.handler; + FlowContext flowContext = clientWorker.flowContext(sessionInfo); + SessionPayloadEvent event = flowContext.sentEvent(); + + HttpRequest request = (HttpRequest) event.getRequest(); + HttpResponse response = (HttpResponse) event.getResponse(); + ByteBuff buff = new HeapByteBuff(); + HttpResponseEncoder.INSTANCE.encode(response, buff); + try { + clientWorker.dataReceived(channelContext, buff); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + + protected void close(LocalChannelContext channelContext) { + try { + channelContext.handler.channelInactive(channelContext); + channelContext.handler.channelUnregistered(channelContext); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + + @Override + public boolean isStarted() { + return started; + } + + @Override + public void start() { + if (sessionMapper == null) { + sessionMapper = new PassThroughSessionMapper(); + } + + started = true; + } + + @Override + public void stop() { + started = false; + } + + @Override + public boolean isCollectMetrics() { + return false; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + } + + @Override + public void writeMetrics(MetricsContainer container) { + } + + @Override + public void configure(Configuration config, ConfigurationContext context) { + } + } + + private static class LocalChannelContext implements ChannelContext { + + private final LocalEmitter emitter; + + private final EmitterHandler handler; + + private final SessionInfo sessionInfo; + + private final Queue<ByteBuffer> dataQueue; + + private SocketAddress localAddress; + + private SocketAddress remoteAddress; + + public LocalChannelContext(LocalEmitter emitter, EmitterHandler handler, SocketAddress remoteAddress, SessionInfo sessionInfo) { + this.emitter = emitter; + this.handler = handler; + this.remoteAddress = remoteAddress; + this.sessionInfo = sessionInfo; + this.dataQueue = new LinkedList<>(); + } + + @Override + public boolean isConnected() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public boolean isConnectionPending() { + throw new UnsupportedOperationException("Not supported yet."); + } + + private void addToQeueu(ByteBuffer buffer) throws IOException { + dataQueue.add(buffer); + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + addToQeueu(ByteBuffer.wrap(data, offset, length)); + } + + @Override + public void write(ByteBuff data) throws IOException { + addToQeueu(data.toNioByteBuffer()); + } + + @Override + public void flush() throws IOException { + emitter.flush(this); + } + + @Override + public void close() throws IOException { + emitter.close(this); + } + + @Override + public SocketAddress getLocalAddress() { + return localAddress; + } + + @Override + public SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public SessionInfo getSessionInfo() { + return sessionInfo; + } + + } + + private SynchFlowWorker createWorker() { + LocalEmitter emitter = new LocalEmitter(); + SynchFlowWorker worker = new SynchFlowWorker(emitter, "test", 0); + worker.setListener(listener); + return worker; + } + + private List<Event> readEvents(String pcapFile) throws Exception { + Properties props = new Properties(); + props.put("allowPartialSession", "true"); + props.put("ports", "4214"); + return EventUtils.readEvents(pcapFile, props); + } + + @AfterMethod + public void afterMethod() { + listener.clear(); + } + + @Test + public void testHandle_HTTP_SimpleRequestResponse() throws Exception { + List<Event> events = readEvents("pcap/http/http_req_resp.pcap"); + assertEquals(4, events.size()); + + SynchFlowWorker worker = createWorker(); + worker.start(); + SessionEvent sessionEvent = (SessionEvent) events.get(0); + worker.handle(new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED)); + events.forEach(worker::handle); + worker.join(); + + assertHttpClientEvents(events, listener.events()); + } + + @Test + public void testHandle_HTTP_SimpleRequestResponse_ConnectPartialSession() throws Exception { + List<Event> events = readEvents("pcap/http/http_req_resp.pcap"); + assertEquals(4, events.size()); + + SynchFlowWorker worker = createWorker(); + worker.setConnectPartialSession(true); + worker.start(); + events.forEach(worker::handle); + worker.join(); + + assertHttpClientEvents(events, listener.events()); + } +} \ No newline at end of file
--- a/stress-tester/src/test/java/com/passus/st/client/http/HttpAsynchClientWorkerTest.java Wed May 29 15:03:59 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,86 +0,0 @@ -package com.passus.st.client.http; - -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.post; -import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; -import com.passus.commons.service.ServiceUtils; -import com.passus.st.AbstractWireMockTest; -import com.passus.st.client.Event; -import com.passus.st.client.SessionEvent; -import com.passus.st.client.SessionStatusEvent; -import com.passus.st.client.TestHttpClientListener; -import com.passus.st.emitter.RuleBasedSessionMapper; -import com.passus.st.emitter.nio.NioEmitter; -import com.passus.st.utils.EventUtils; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertTrue; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -/** - * - * @author Mirosław Hawrot - */ -public class HttpAsynchClientWorkerTest extends AbstractWireMockTest { - - private NioEmitter prepareEmitter(String mapperRule) throws Exception { - RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper(); - sessionMapper.addRule(mapperRule); - - NioEmitter emitter = new NioEmitter(); - emitter.setSessionMapper(sessionMapper); - return emitter; - } - - @BeforeMethod - public void beforeMethod() { - String content = "test"; - stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html")) - .willReturn(aResponse() - .withHeader("Content-Type", "text/plain") - .withHeader("Content-Length", "" + content.length()) - .withBody(content))); - } - - @Test(enabled = false) - public void testHandle() throws Exception { - Properties props = new Properties(); - props.put("allowPartialSession", "true"); - props.put("ports", "4214"); - List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); - assertEquals(4, events.size()); - - NioEmitter emitter = prepareEmitter("10.87.110.40:4214 -> " + HOST + ":" + port()); - emitter.start(); - - TestHttpClientListener listner = new TestHttpClientListener(); - - HttpAsynchClientWorker worker = new HttpAsynchClientWorker(emitter, "test", 0); - try { - worker.setListeners(Arrays.asList(listner)); - worker.start(); - - SessionEvent sessionEvent = (SessionEvent) events.get(0); - SessionStatusEvent statusEvent = new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED); - statusEvent.setTimestamp(sessionEvent.getTimestamp()); - worker.handle(statusEvent); - - events.forEach(worker::handle); - - worker.join(); - assertTrue(listner.size() > 0); - assertTrue(listner.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent); - TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listner.get(0); - String responseStr = event.getResponse().toString(); - assertTrue(responseStr.startsWith("HTTP/1.1 200 OK")); - assertTrue(responseStr.endsWith("test")); - } finally { - ServiceUtils.stopQuietly(emitter); - } - } - -}
--- a/stress-tester/src/test/java/com/passus/st/client/http/HttpParallelClientWorkerTest.java Wed May 29 15:03:59 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,80 +0,0 @@ -package com.passus.st.client.http; - -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.post; -import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; -import com.passus.commons.service.ServiceUtils; -import com.passus.st.AbstractWireMockTest; -import com.passus.st.client.Event; -import com.passus.st.client.TestHttpClientListener; -import com.passus.st.emitter.RuleBasedSessionMapper; -import com.passus.st.emitter.nio.NioEmitter; -import com.passus.st.utils.EventUtils; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertTrue; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -/** - * - * @author Mirosław Hawrot - */ -public class HttpParallelClientWorkerTest extends AbstractWireMockTest { - - private NioEmitter prepareEmitter(String mapperRule) throws Exception { - RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper(); - sessionMapper.addRule(mapperRule); - - NioEmitter emitter = new NioEmitter(); - emitter.setSessionMapper(sessionMapper); - return emitter; - } - - @BeforeMethod - public void beforeMethod() { - String content = "test"; - stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html")) - .willReturn(aResponse() - .withHeader("Content-Type", "text/plain") - .withHeader("Content-Length", "" + content.length()) - .withBody(content))); - } - - @Test - public void testHandle() throws Exception { - Properties props = new Properties(); - props.put("allowPartialSession", "true"); - props.put("ports", "4214"); - List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); - assertEquals(4, events.size()); - - NioEmitter emitter = prepareEmitter("10.87.110.40:4214 -> " + HOST + ":" + port()); - emitter.start(); - - TestHttpClientListener listner = new TestHttpClientListener(); - - HttpParallelClientWorker worker = new HttpParallelClientWorker(emitter, "test", 0); - try { - worker.setListeners(Arrays.asList(listner)); - worker.start(); - - events.forEach(worker::handle); - - worker.join(); - assertTrue(listner.size() > 0); - assertTrue(listner.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent); - TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listner.get(0); - String responseStr = event.getResponse().toString(); - assertTrue(responseStr.startsWith("HTTP/1.1 200 OK")); - assertTrue(responseStr.endsWith("test")); - } finally { - ServiceUtils.stopQuietly(emitter); - } - - } - -}
--- a/stress-tester/src/test/java/com/passus/st/client/http/HttpSynchClientWorkerTest.java Wed May 29 15:03:59 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,249 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.config.Configuration; -import com.passus.config.ConfigurationContext; -import com.passus.data.ByteBuff; -import com.passus.data.HeapByteBuff; -import com.passus.net.SocketAddress; -import com.passus.net.http.HttpRequest; -import com.passus.net.http.HttpResponse; -import com.passus.net.http.HttpResponseEncoder; -import com.passus.st.client.*; -import com.passus.st.emitter.ChannelContext; -import com.passus.st.emitter.Emitter; -import com.passus.st.emitter.EmitterHandler; -import com.passus.st.emitter.PassThroughSessionMapper; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.emitter.SessionMapper; -import com.passus.st.metric.MetricsContainer; -import com.passus.st.utils.EventUtils; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import java.util.Queue; -import org.testng.annotations.Test; -import static com.passus.st.utils.Assert.*; -import org.testng.annotations.AfterMethod; - -/** - * - * @author mikolaj.podbielski - * @author miroslaw.hawrot - */ -public class HttpSynchClientWorkerTest { - - private final TestHttpClientListener listner = new TestHttpClientListener(); - - private static class LocalEmitter implements Emitter { - - private SessionMapper sessionMapper; - - private boolean started = false; - - @Override - public void setSessionMapper(SessionMapper sessionMapper) { - this.sessionMapper = sessionMapper; - } - - @Override - public SessionMapper getSessionMapper() { - return sessionMapper; - } - - @Override - public void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException { - LocalChannelContext channelContext = new LocalChannelContext(this, handler, null, session); - try { - handler.channelRegistered(channelContext); - handler.channelActive(channelContext); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - - protected void flush(LocalChannelContext channelContext) { - SessionInfo sessionInfo = channelContext.getSessionInfo(); - HttpFlowBasedClientWorker clientWorker = (HttpFlowBasedClientWorker) channelContext.handler; - FlowContext flowContext = clientWorker.flowContext(sessionInfo); - HttpSessionPayloadEvent event = (HttpSessionPayloadEvent) flowContext.sentEvent(); - - HttpRequest request = event.getRequest(); - HttpResponse response = event.getResponse(); - ByteBuff buff = new HeapByteBuff(); - HttpResponseEncoder.INSTANCE.encode(response, buff); - try { - clientWorker.dataReceived(channelContext, buff); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - - protected void close(LocalChannelContext channelContext) { - try { - channelContext.handler.channelInactive(channelContext); - channelContext.handler.channelUnregistered(channelContext); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - - @Override - public boolean isStarted() { - return started; - } - - @Override - public void start() { - if(sessionMapper == null) { - sessionMapper = new PassThroughSessionMapper(); - } - - started = true; - } - - @Override - public void stop() { - started = false; - } - - @Override - public boolean isCollectMetrics() { - return false; - } - - @Override - public void setCollectMetrics(boolean collectMetrics) { - } - - @Override - public void writeMetrics(MetricsContainer container) { - } - - @Override - public void configure(Configuration config, ConfigurationContext context) { - } - } - - private static class LocalChannelContext implements ChannelContext { - - private final LocalEmitter emitter; - - private final EmitterHandler handler; - - private final SessionInfo sessionInfo; - - private final Queue<ByteBuffer> dataQueue; - - private SocketAddress localAddress; - - private SocketAddress remoteAddress; - - public LocalChannelContext(LocalEmitter emitter, EmitterHandler handler, SocketAddress remoteAddress, SessionInfo sessionInfo) { - this.emitter = emitter; - this.handler = handler; - this.remoteAddress = remoteAddress; - this.sessionInfo = sessionInfo; - this.dataQueue = new LinkedList<>(); - } - - @Override - public boolean isConnected() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public boolean isConnectionPending() { - throw new UnsupportedOperationException("Not supported yet."); - } - - private void addToQeueu(ByteBuffer buffer) throws IOException { - dataQueue.add(buffer); - } - - @Override - public void write(byte[] data, int offset, int length) throws IOException { - addToQeueu(ByteBuffer.wrap(data, offset, length)); - } - - @Override - public void write(ByteBuff data) throws IOException { - addToQeueu(data.toNioByteBuffer()); - } - - @Override - public void flush() throws IOException { - emitter.flush(this); - } - - @Override - public void close() throws IOException { - emitter.close(this); - } - - @Override - public SocketAddress getLocalAddress() { - return localAddress; - } - - @Override - public SocketAddress getRemoteAddress() { - return remoteAddress; - } - - @Override - public SessionInfo getSessionInfo() { - return sessionInfo; - } - - } - - @AfterMethod - public void afterMethod() { - listner.clear(); - } - - private HttpSynchClientWorker createWorker() { - LocalEmitter emitter = new LocalEmitter(); - HttpSynchClientWorker worker = new HttpSynchClientWorker(emitter, "test", 0); - worker.addListener(listner); - return worker; - } - - private List<Event> readEvents(String pcapFile) throws Exception { - Properties props = new Properties(); - props.put("allowPartialSession", "true"); - props.put("ports", "4214"); - return EventUtils.readEvents(pcapFile, props); - } - - @Test(enabled = false) - public void testHandle_SimpleRequestResponse() throws Exception { - List<Event> events = readEvents("pcap/http/http_req_resp.pcap"); - assertEquals(4, events.size()); - - HttpSynchClientWorker worker = createWorker(); - worker.start(); - SessionEvent sessionEvent = (SessionEvent) events.get(0); - worker.handle(new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED)); - events.forEach(worker::handle); - worker.join(); - - assertHttpClientEvents(events, listner.events()); - } - - @Test(enabled = false) - public void testHandle_SimpleRequestResponse_ConnectPartialSession() throws Exception { - List<Event> events = readEvents("pcap/http/http_req_resp.pcap"); - assertEquals(4, events.size()); - - HttpSynchClientWorker worker = createWorker(); - worker.setConnectPartialSession(true); - worker.start(); - events.forEach(worker::handle); - worker.join(); - - assertHttpClientEvents(events, listner.events()); - } -}
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java Wed May 29 15:03:59 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java Thu May 30 10:26:34 2019 +0200 @@ -1,26 +1,23 @@ package com.passus.st.reader.nc; -import static com.passus.data.DataSourceUtils.toByteBuff; import com.passus.data.HeapByteBuff; -import com.passus.net.http.HttpConsts; -import com.passus.net.http.HttpHeaders; -import com.passus.net.http.HttpHeadersImpl; -import com.passus.net.http.HttpRequest; -import com.passus.net.http.HttpRequestBuilder; -import com.passus.net.http.HttpResponse; -import com.passus.net.http.HttpResponseBuilder; +import com.passus.net.http.*; +import com.passus.st.client.SessionPayloadEvent; import com.passus.st.client.http.HttpReqResp; -import com.passus.st.client.http.HttpSessionPayloadEvent; import com.passus.st.emitter.SessionInfo; -import static com.passus.st.utils.HttpMessageAssert.*; +import org.testng.annotations.Test; + import java.io.File; import java.io.IOException; import java.util.UUID; + +import static com.passus.data.DataSourceUtils.toByteBuff; +import static com.passus.st.Protocols.HTTP; +import static com.passus.st.utils.HttpMessageAssert.assertMessages; +import static com.passus.st.utils.HttpMessageAssert.assertMessagesContent; import static org.testng.AssertJUnit.*; -import org.testng.annotations.Test; /** - * * @author Mirosław Hawrot */ public class HttpSessionPayloadEventDataWriterTest { @@ -56,7 +53,7 @@ HttpSessionPayloadEventDataWriter payloadWriter = new HttpSessionPayloadEventDataWriter(); try (NcDataBlockWriter writer = new NcDataBlockWriter(file)) { writer.open(); - payloadWriter.write(new HttpSessionPayloadEvent(session, req, resp, ""), writer); + payloadWriter.write(new SessionPayloadEvent(session, req, resp, HTTP, ""), writer); } }
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java Wed May 29 15:03:59 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java Thu May 30 10:26:34 2019 +0200 @@ -4,7 +4,6 @@ import com.passus.st.client.Event; import com.passus.st.client.SessionPayloadEvent; import com.passus.st.client.SessionStatusEvent; -import com.passus.st.client.http.HttpSessionPayloadEvent; import com.passus.st.emitter.SessionInfo; import com.passus.st.reader.nc.NcDataBlock; import com.passus.st.reader.nc.NcSegmentBlock;
--- a/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java Wed May 29 15:03:59 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java Thu May 30 10:26:34 2019 +0200 @@ -1,7 +1,6 @@ package com.passus.st.source; import com.passus.st.client.SessionPayloadEvent; -import com.passus.st.client.http.HttpSessionPayloadEvent; import com.passus.commons.utils.ResourceUtils; import com.passus.st.client.ArrayListEventHandler; import com.passus.st.client.DataEvents.DataEnd;