Mercurial > stress-tester
changeset 956:b0867db5ea27
Refactorization in progress
line wrap: on
line diff
--- a/stress-tester/pom.xml Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/pom.xml Wed May 29 12:32:29 2019 +0200 @@ -105,6 +105,12 @@ </dependency> <dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + <version>8.2.2</version> + </dependency> + + <dependency> <groupId>com.passus</groupId> <artifactId>passus-lookup</artifactId> <version>1.0-SNAPSHOT</version>
--- a/stress-tester/src/main/java/com/passus/st/PcapScanner.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/PcapScanner.java Wed May 29 12:32:29 2019 +0200 @@ -6,7 +6,7 @@ import com.passus.net.http.session.HttpSessionAnalyzer; import com.passus.st.client.Event; import com.passus.st.client.EventHandler; -import com.passus.st.client.http.HttpSessionPayloadEvent; +import com.passus.st.client.SessionPayloadEvent; import com.passus.st.source.PcapSessionEventSource; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; @@ -19,6 +19,7 @@ import java.util.stream.Collectors; import static com.passus.st.Main.printHelp; +import static com.passus.st.Protocols.HTTP; import static com.passus.st.utils.CliUtils.option; /** @@ -74,8 +75,8 @@ @Override public void handle(Event event) { - if (event instanceof HttpSessionPayloadEvent) { - Object extracted = extractor.extract((HttpSessionPayloadEvent) event); + if (event instanceof SessionPayloadEvent) { + Object extracted = extractor.extract((SessionPayloadEvent) event); if (extracted != null) { values.add(extracted.toString()); // System.out.println(extracted.toString()); @@ -89,23 +90,26 @@ public interface Extractor { - public Object extract(HttpSessionPayloadEvent event); + Object extract(SessionPayloadEvent event); } static class PostExtractor implements Extractor { @Override - public Object extract(HttpSessionPayloadEvent event) { - HttpRequest request = event.getRequest(); - ByteString contentTypeBs = request.getHeaders().get(HttpHeaders.CONTENT_TYPE); - HttpMethod method = request.getMethod(); + public Object extract(SessionPayloadEvent event) { + if (event.getProtocolId() == HTTP) { + HttpRequest request = (HttpRequest) event.getRequest(); + ByteString contentTypeBs = request.getHeaders().get(HttpHeaders.CONTENT_TYPE); + HttpMethod method = request.getMethod(); - if (HttpMethod.POST.equals(method) && contentTypeBs != null) { - String contentType = contentTypeBs.toString(); - if (contentType.equals("application/x-www-form-urlencoded")) { - return request; + if (HttpMethod.POST.equals(method) && contentTypeBs != null) { + String contentType = contentTypeBs.toString(); + if (contentType.equals("application/x-www-form-urlencoded")) { + return request; + } } } + return null; } @@ -177,9 +181,9 @@ } } - private static Object urlExtract(HttpSessionPayloadEvent event, URLExtractor ue) { + private static Object urlExtract(SessionPayloadEvent event, URLExtractor ue) { try { - return ue.extract(URL.parse(event.getRequest().getUrl())); + return ue.extract(URL.parse(((HttpRequest) event.getRequest()).getUrl())); } catch (MalformedURLException ex) { ex.printStackTrace(System.err); return null; @@ -203,7 +207,7 @@ switch (type) { case "url": - return (e) -> e.getRequest().getUrl(); + return (e) -> ((HttpRequest) e.getRequest()).getUrl(); case "path": return (e) -> urlExtract(e, URL::getPath); case "query": @@ -213,9 +217,9 @@ case "reqPost": return new PostExtractor(); case "reqHdr": - return (e) -> e.getRequest().getHeaders().get(arg); + return (e) -> ((HttpRequest) e.getRequest()).getHeaders().get(arg); case "respHdr": - return (e) -> e.getResponse().getHeaders().get(arg); + return (e) -> ((HttpRequest) e.getResponse()).getHeaders().get(arg); default: throw new ParseException("Invalid extractor spec: " + spec); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/Protocols.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,22 @@ +package com.passus.st; + +public class Protocols { + + public static final int UNKNOWN = 0; + public static final int HTTP = 1; + public static final int DNS = 2; + + private Protocols() { + } + + public static String protocolToString(int protocolId) { + switch (protocolId) { + case HTTP: + return "HTTP"; + case DNS: + return "DNS"; + default: + return "unknown"; + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/AbstractClient.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,84 @@ +package com.passus.st.client; + +import com.passus.commons.Assert; +import com.passus.st.emitter.Emitter; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public abstract class AbstractClient implements Client { + + private static final boolean DEFAULT_COLLECT_METRICS = false; + + private Emitter emitter; + + private final List<ClientListener> listeners = new ArrayList<>(); + + private FlowFilterChain filterChain = new FlowFilterChain(); + + private boolean collectMetrics = DEFAULT_COLLECT_METRICS; + + @Override + public Emitter getEmitter() { + return emitter; + } + + @Override + public void setEmitter(Emitter emitter) { + this.emitter = emitter; + } + + @Override + public void setListeners(Collection<ClientListener> listeners) { + Assert.notContainsNull(listeners, "listeners"); + this.listeners.addAll(listeners); + } + + @Override + public void addListener(ClientListener listener) { + Assert.notNull(listener, "listener"); + listeners.add(listener); + } + + @Override + public void removeListener(ClientListener listener) { + Assert.notNull(listener, "listener"); + listeners.remove(listener); + } + + @Override + public List<ClientListener> getListeners() { + return Collections.unmodifiableList(listeners); + } + + @Override + public List<FlowFilter> getFilters() { + return filterChain.getFilters(); + } + + @Override + public void addFilter(FlowFilter filter) { + Assert.notNull(filter, "filter"); + filterChain.addFilter(filter); + } + + @Override + public void setFilters(Collection<FlowFilter> filters) { + Assert.notContainsNull(filters, "filters"); + filterChain.clear(); + filters.forEach((filter) -> filterChain.addFilter(filter)); + } + + @Override + public boolean isCollectMetrics() { + return collectMetrics; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + this.collectMetrics = collectMetrics; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/AbstractFlow.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,99 @@ +package com.passus.st.client; + +import com.passus.commons.Assert; +import com.passus.st.emitter.Emitter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public abstract class AbstractFlow implements Flow { + + protected final Logger logger = LogManager.getLogger(getClass()); + + private static final boolean DEFAULT_COLLECT_METRICS = false; + + protected Emitter emitter; + + private final List<ClientListener> listeners = new ArrayList<>(); + + private FlowFilterChain filterChain = new FlowFilterChain(); + + private boolean collectMetrics = DEFAULT_COLLECT_METRICS; + + protected ClientX client; + + @Override + public ClientX getClient() { + return client; + } + + @Override + public void setClient(ClientX client) { + this.client = client; + } + + @Override + public Emitter getEmitter() { + return emitter; + } + + @Override + public void setEmitter(Emitter emitter) { + this.emitter = emitter; + } + + @Override + public void setListeners(Collection<ClientListener> listeners) { + Assert.notContainsNull(listeners, "listeners"); + this.listeners.addAll(listeners); + } + + @Override + public void addListener(ClientListener listener) { + Assert.notNull(listener, "listener"); + listeners.add(listener); + } + + @Override + public void removeListener(ClientListener listener) { + Assert.notNull(listener, "listener"); + listeners.remove(listener); + } + + @Override + public List<ClientListener> getListeners() { + return Collections.unmodifiableList(listeners); + } + + @Override + public List<FlowFilter> getFilters() { + return filterChain.getFilters(); + } + + @Override + public void addFilter(FlowFilter filter) { + Assert.notNull(filter, "filter"); + filterChain.addFilter(filter); + } + + @Override + public void setFilters(Collection<FlowFilter> filters) { + Assert.notContainsNull(filters, "filters"); + filterChain.clear(); + filters.forEach((filter) -> filterChain.addFilter(filter)); + } + + @Override + public boolean isCollectMetrics() { + return collectMetrics; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + this.collectMetrics = collectMetrics; + } +}
--- a/stress-tester/src/main/java/com/passus/st/client/Client.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/Client.java Wed May 29 12:32:29 2019 +0200 @@ -5,15 +5,32 @@ import com.passus.st.emitter.Emitter; import com.passus.st.metric.MetricSource; +import java.util.Collection; +import java.util.List; + /** * @author Mirosław Hawrot */ public interface Client extends EventHandler, MetricSource, Service, Configurable { - public Emitter getEmitter(); + Emitter getEmitter(); - public void setEmitter(Emitter emitter); + void setEmitter(Emitter emitter); - public void join() throws InterruptedException; + void join() throws InterruptedException; + + void setListeners(Collection<ClientListener> listeners); + + void addListener(ClientListener listener); + + void removeListener(ClientListener listener); + + List<ClientListener> getListeners(); + + List<FlowFilter> getFilters(); + + void addFilter(FlowFilter filter); + + void setFilters(Collection<FlowFilter> filters); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/ClientX.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,24 @@ +package com.passus.st.client; + +import com.passus.config.Configurable; +import com.passus.st.metric.MetricSource; + +public interface ClientX extends MetricSource, Configurable { + + int getProtocolId(); + + void init(FlowContext flowContext); + + ClientXDataDecoder getResponseDecoder(FlowContext flowContext); + + ClientXDataEncoder getRequestEncoder(FlowContext flowContext); + + default void onDataWriteStart(FlowContext flowContext) { + + } + + default void onDataWriteEnd(FlowContext flowContext) { + + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/ClientXDataDecoder.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,19 @@ +package com.passus.st.client; + +import com.passus.data.ByteBuff; + +public interface ClientXDataDecoder<T> { + + T getResult(); + + int state(); + + String getLastError(); + + default void clear(FlowContext flowContext) { + + } + + int decode(ByteBuff buffer, FlowContext flowContext); + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/ClientXDataEncoder.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,9 @@ +package com.passus.st.client; + +import com.passus.data.ByteBuff; + +public interface ClientXDataEncoder<T> { + + void encode(T request, FlowContext flowContext, ByteBuff out); + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/ClientXFactory.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,7 @@ +package com.passus.st.client; + +public interface ClientXFactory { + + ClientX create(int protocolId); + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/ClientXFactoryImpl.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,18 @@ +package com.passus.st.client; + +import com.passus.st.client.http.HttpClientX; + +import static com.passus.st.Protocols.HTTP; + +public class ClientXFactoryImpl implements ClientXFactory { + + @Override + public ClientX create(int protocolId) { + switch (protocolId) { + case HTTP: + return new HttpClientX(); + } + + return null; + } +}
--- a/stress-tester/src/main/java/com/passus/st/client/EventBusListener.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/EventBusListener.java Wed May 29 12:32:29 2019 +0200 @@ -6,9 +6,9 @@ */ public interface EventBusListener { - public void eventDropped(Event event); + void eventDropped(Event event); - public void eventQueued(Event event); + void eventQueued(Event event); - public void eventProcessed(Event event); + void eventProcessed(Event event); }
--- a/stress-tester/src/main/java/com/passus/st/client/EventHandler.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/EventHandler.java Wed May 29 12:32:29 2019 +0200 @@ -6,6 +6,6 @@ */ public interface EventHandler { - public void handle(Event event); + void handle(Event event); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/Flow.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,36 @@ +package com.passus.st.client; + +import com.passus.config.Configurable; +import com.passus.st.emitter.Emitter; +import com.passus.st.metric.MetricSource; + +import java.util.Collection; +import java.util.List; + +public interface Flow extends EventHandler, MetricSource, Configurable { + + ClientX getClient(); + + void setClient(ClientX client); + + Emitter getEmitter(); + + void setEmitter(Emitter emitter); + + void join() throws InterruptedException; + + void setListeners(Collection<ClientListener> listeners); + + void addListener(ClientListener listener); + + void removeListener(ClientListener listener); + + List<ClientListener> getListeners(); + + List<FlowFilter> getFilters(); + + void addFilter(FlowFilter filter); + + void setFilters(Collection<FlowFilter> filters); + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowBased.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,4 @@ +package com.passus.st.client; + +public abstract class FlowBased extends AbstractFlow { +}
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Wed May 29 12:32:29 2019 +0200 @@ -3,7 +3,6 @@ import com.passus.commons.Assert; import com.passus.data.ByteBuff; import com.passus.data.DataDecoder; -import com.passus.st.client.http.HttpSessionPayloadEvent; import com.passus.st.emitter.ChannelContext; import com.passus.st.emitter.SessionInfo; @@ -42,6 +41,9 @@ private int loop; + private ClientX client; + + @Deprecated protected DataDecoder decoder; private Map<String, Object> params; @@ -58,6 +60,14 @@ this.channelContext = channelContext; } + public ClientX client() { + return client; + } + + public void client(ClientX client) { + this.client = client; + } + public void state(int state) { this.state = state; } @@ -98,6 +108,7 @@ this.sentEvent = sentEvent; } + @Deprecated public DataDecoder decoder() { return decoder; } @@ -126,7 +137,7 @@ return timeout != -1 && System.currentTimeMillis() > timeout; } - public void setSentEvent(HttpSessionPayloadEvent sentEvent) { + public void setSentEvent(SessionPayloadEvent sentEvent) { this.sentEvent = sentEvent; } @@ -180,6 +191,7 @@ public void clear() { buffer = null; sentEvent = null; + timeout = -1; if (params != null) { params.clear();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,73 @@ +package com.passus.st.client; + +import com.passus.st.emitter.SessionInfo; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static com.passus.st.client.FlowContext.STATE_DISCONNECTED; + +public class FlowUtils { + + public static final Map<Integer, Long> DEFAULT_TIMEOUTS; + + static { + Map<Integer, Long> defaultTimeouts = new HashMap<>(); + defaultTimeouts.put(FlowContext.STATE_CONNECTING, 10_000L); + defaultTimeouts.put(FlowContext.STATE_CONNECTED, 60_000L); + defaultTimeouts.put(FlowContext.STATE_REQ_SENT, 30_000L); + defaultTimeouts.put(FlowContext.STATE_RESP_RECEIVED, 60_000L); + defaultTimeouts.put(FlowContext.STATE_ERROR, 60_000L); + defaultTimeouts.put(FlowContext.STATE_DISCONNECTING, 2_000L); + defaultTimeouts.put(STATE_DISCONNECTED, 0L); + DEFAULT_TIMEOUTS = Collections.unmodifiableMap(defaultTimeouts); + } + + private FlowUtils() { + } + + + public static void trace(Logger log, FlowContext flowContext, String message, Object... args) { + log(log, flowContext, Level.TRACE, message, args); + } + + public static void debug(Logger log, FlowContext flowContext, String message, Throwable cause) { + log(log, flowContext, Level.DEBUG, message, cause); + } + + public static void error(Logger log, FlowContext flowContext, String message, Throwable cause) { + log(log, flowContext, Level.ERROR, message, cause); + } + + public static void log(Logger log, FlowContext flowContext, Level level, String message, Throwable cause) { + message = String.format("%s [%s]", message, flowContext.sessionInfo()); + log.log(level, message, cause); + } + + public static final void debug(Logger log, FlowContext flowContext, String message, Object... args) { + log(log, flowContext, Level.DEBUG, message, args); + } + + public static void log(Logger log, FlowContext flowContext, Level level, String message, Object... args) { + if (args.length > 0) { + message = String.format(message, args); + } + + SessionInfo session = flowContext.sessionInfo(); + if (args.length == 0) { + log.log(level, message + " [{}]", session); + } else { + Object[] logArgs = new Object[args.length + 1]; + for (int i = 0; i < args.length; i++) { + logArgs[i] = args[i]; + } + + logArgs[logArgs.length - 1] = session; + log.log(level, message + " [{}]", logArgs); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,168 @@ +package com.passus.st.client; + +import com.passus.commons.Assert; +import com.passus.commons.time.TimeAware; +import com.passus.commons.time.TimeGenerator; +import com.passus.st.client.http.HttpClientWorkerMetric; +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.metric.MetricSource; +import com.passus.st.metric.MetricsContainer; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public abstract class FlowWorker extends Thread implements EmitterHandler, MetricSource, TimeAware { + + protected final Logger logger = LogManager.getLogger(getClass()); + + protected final int index; + + private ClientListener listener; + + protected FlowFilterChain filterChain = new FlowFilterChain(); + + protected final Emitter emitter; + + protected boolean collectMetric; + + protected HttpClientWorkerMetric metric; + + protected boolean connectPartialSession = false; + + protected TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); + + protected ClientXFactory clientFactory; + + public FlowWorker(Emitter emitter, String name, int index) { + super(name + index); + Assert.notNull(emitter, "emitter"); + this.emitter = emitter; + this.index = index; + } + + public boolean isConnectPartialSession() { + return connectPartialSession; + } + + public void setConnectPartialSession(boolean connectPartialSession) { + this.connectPartialSession = connectPartialSession; + } + + public abstract boolean isWorking(); + + public int index() { + return index; + } + + public FlowFilterChain filterChain() { + return filterChain; + } + + public void setFilterChain(FlowFilterChain filterChain) { + Assert.notNull(filterChain, "filterChain"); + this.filterChain = filterChain; + } + + protected final void fireResponseReceived(Object request, Object response, FlowContext context) { + if (listener != null) { + listener.responseReceived(request, response, context); + } + } + + public ClientListener getListener() { + return listener; + } + + public void setListener(ClientListener listener) { + this.listener = listener; + } + + @Override + public boolean isCollectMetrics() { + return metric != null; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + if (collectMetrics && metric == null) { + metric = new HttpClientWorkerMetric(); + metric.activate(); + collectMetric = true; + } else if (!collectMetrics && metric != null) { + metric.deactivate(); + collectMetric = false; + metric = null; + } + } + + @Override + public TimeGenerator getTimeGenerator() { + return timeGenerator; + } + + @Override + public void setTimeGenerator(TimeGenerator timeGenerator) { + Assert.notNull(timeGenerator, "timeGenerator"); + this.timeGenerator = timeGenerator; + } + + @Override + public void writeMetrics(MetricsContainer container) { + if (collectMetric) { + synchronized (metric) { + container.update(System.currentTimeMillis(), metric); + metric.reset(); + } + } + } + + public abstract int activeConnections(); + + public abstract void close(); + + public abstract void close(SessionInfo session); + + public abstract void handle(Event event); + + protected final void trace(FlowContext flowContext, String message, Object... args) { + log(flowContext, Level.TRACE, message, args); + } + + protected final void debug(FlowContext flowContext, String message, Throwable cause) { + log(flowContext, Level.DEBUG, message, cause); + } + + protected final void error(FlowContext flowContext, String message, Throwable cause) { + log(flowContext, Level.ERROR, message, cause); + } + + protected final void log(FlowContext flowContext, Level level, String message, Throwable cause) { + message = String.format("%s [%s]", message, flowContext.sessionInfo()); + logger.log(level, message, cause); + } + + protected final void debug(FlowContext flowContext, String message, Object... args) { + log(flowContext, Level.DEBUG, message, args); + } + + protected final void log(FlowContext flowContext, Level level, String message, Object... args) { + if (args.length > 0) { + message = String.format(message, args); + } + + SessionInfo session = flowContext.sessionInfo(); + if (args.length == 0) { + logger.log(level, message + " [{}]", session); + } else { + Object[] logArgs = new Object[args.length + 1]; + for (int i = 0; i < args.length; i++) { + logArgs[i] = args[i]; + } + + logArgs[logArgs.length - 1] = session; + logger.log(level, message + " [{}]", logArgs); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,563 @@ +package com.passus.st.client; + +import com.passus.commons.Assert; +import com.passus.data.ByteBuff; +import com.passus.data.DataDecoder; +import com.passus.data.HeapByteBuff; +import com.passus.filter.Filter; +import com.passus.st.emitter.ChannelContext; +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.metric.MetricsContainer; +import it.unimi.dsi.fastutil.ints.Int2LongArrayMap; +import it.unimi.dsi.fastutil.ints.Int2LongMap; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static com.passus.st.client.FlowContext.*; +import static com.passus.st.client.FlowUtils.DEFAULT_TIMEOUTS; + +public abstract class FlowWorkerBased extends FlowWorker { + + public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f; + + protected final Map<SessionInfo, FlowContext> sessions = new ConcurrentHashMap<>(); + + private final Set<SessionInfo> blockedSessions = new HashSet<>(); + + private final Int2LongMap timeouts = new Int2LongArrayMap(); + + protected final Object lock = new Object(); + + protected volatile boolean working = false; + + private long checkTimeoutsPeriod = 5_000; + + private long nextCheckTimeoutsTime = -1; + + private float sleepFactor = SLEEP_FACTOR_NO_SLEEP; + + private long lastEventTimestamp = -1; + + public FlowWorkerBased(Emitter emitter, String name, int index) { + super(emitter, name, index); + timeouts.putAll(DEFAULT_TIMEOUTS); + } + + @Override + public boolean isWorking() { + return working; + } + + @Override + public int activeConnections() { + int count = 0; + synchronized (lock) { + for (FlowContext flowContext : sessions.values()) { + if (flowContext.state() != STATE_DISCONNECTED) { + count++; + } + } + } + + return count; + } + + protected final void addBlockedSession(SessionInfo session) { + blockedSessions.add(session); + } + + protected final boolean isBlockedSession(SessionInfo session) { + return !blockedSessions.isEmpty() && blockedSessions.contains(session); + } + + public float getSleepFactor() { + return sleepFactor; + } + + public void setSleepFactor(float sleepFactor) { + Assert.greaterOrEqualZero(sleepFactor, "sleepFactor"); + this.sleepFactor = sleepFactor; + } + + public long getCheckTimeoutsPeriod() { + return checkTimeoutsPeriod; + } + + public void setCheckTimeoutsPeriod(long checkTimeoutsPeriod) { + Assert.greaterThanZero(checkTimeoutsPeriod, "checkTimeoutsPeriod"); + this.checkTimeoutsPeriod = checkTimeoutsPeriod; + } + + protected final void changeFlowState(FlowContext flowContext, int state) { + try { + if (flowContext.state() == state) { + return; + } + + int oldState = flowContext.state(); + if (logger.isDebugEnabled()) { + debug(flowContext, "Flow status changing {} -> {}.", + contextStateToString(flowContext.state()), + contextStateToString(state) + ); + } + + switch (state) { + case FlowContext.STATE_CONNECTING: + flowContext.clear(); + break; + case FlowContext.STATE_CONNECTED: + flowContext.client().init(flowContext); + flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY)); + break; + case FlowContext.STATE_ERROR: + changeFlowState(flowContext, STATE_DISCONNECTED); + break; + case FlowContext.STATE_RESP_RECEIVED: + flowContext.sentEvent(null); + flowContext.receivedStartTimestamp(-1); + break; + case FlowContext.STATE_DISCONNECTING: + if (flowContext.state() < FlowContext.STATE_DISCONNECTING) { + if (flowContext.channelContext() != null) { + try { + flowContext.channelContext().close(); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } + } else { + changeFlowState(flowContext, STATE_DISCONNECTED); + } + } else { + return; + } + break; + case STATE_DISCONNECTED: + flowContext.state(STATE_DISCONNECTED); + flowContext.clear(); + removeFlowContext(flowContext); + flowStateChanged(flowContext, oldState); + return; + } + + long timeout = timeouts.get(flowContext.state()); + flowContext.timeout(timeGenerator.currentTimeMillis() + timeout); + flowContext.state(state); + flowStateChanged(flowContext, oldState); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + } + + protected void flowStateChanged(FlowContext context, int oldState) { + + } + + protected FlowContext flowContext(SessionEvent event) { + return flowContext(event.getSessionInfo()); + } + + protected FlowContext flowContext(ChannelContext context) { + return flowContext(context.getSessionInfo()); + } + + protected FlowContext flowContext(SessionInfo session) { + FlowContext context = sessions.get(session); + if (context == null) { + if (logger.isDebugEnabled()) { + logger.debug("Context for session '" + session + "' not found."); + } + } + + return context; + } + + protected FlowContext createFlowContext(SessionInfo session) { + return new FlowContext(session); + } + + protected FlowContext register(SessionEvent sessionEvent) { + return register(sessionEvent.getSessionInfo()); + } + + protected FlowContext register(SessionInfo session) { + synchronized (lock) { + if (sessions.containsKey(session)) { + logger.warn("Unable to register session '" + session + "'. Session already registered."); + return null; + } + + FlowContext flowContext = createFlowContext(session); + sessions.put(session, flowContext); + return flowContext; + } + } + + protected FlowContext connect(SessionEvent sessionEvent) { + return connect(sessionEvent.getSessionInfo()); + } + + protected FlowContext connect(SessionInfo session) { + synchronized (lock) { + try { + FlowContext flowContext = register(session); + if (flowContext != null) { + emitter.connect(session, this, index); + return flowContext; + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + return null; + } + } + + @Override + public void close() { + synchronized (lock) { + for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) { + FlowContext flowContext = entry.getValue(); + try { + closeSession(flowContext); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + debug(flowContext, e.getMessage(), e); + } + } + } + + sessions.clear(); + working = false; + } + } + + protected void close(SessionEvent sessionEvent) { + close(sessionEvent.getSessionInfo()); + } + + protected void close(FlowContext flowContext) { + synchronized (lock) { + try { + closeSession(flowContext); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } + } + } + + @Override + public void close(SessionInfo session) { + synchronized (lock) { + try { + FlowContext flowContext = flowContext(session); + closeSession(flowContext); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } + } + } + + protected void closeSession(FlowContext flowContext) { + synchronized (lock) { + if (flowContext != null) { + changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING); + } + } + } + + protected void removeFlowContext(FlowContext flowContext) { + synchronized (lock) { + debug(flowContext, "removeFlowContext"); + sessions.remove(flowContext.sessionInfo()); + } + } + + protected void reconnect(FlowContext flowContext) { + synchronized (lock) { + try { + if (logger.isDebugEnabled()) { + debug(flowContext, "Reconnect (state: {}).", contextStateToString(flowContext.state())); + } + + SessionInfo session = flowContext.sessionInfo(); + changeFlowState(flowContext, FlowContext.STATE_CONNECTING); + emitter.connect(session, this, index); + } catch (Exception e) { + error(flowContext, e.getMessage(), e); + } + } + } + + protected void closeAllConnections() { + synchronized (lock) { + for (FlowContext flowContext : sessions.values()) { + closeSession(flowContext); + } + } + } + + private void sleepSilently(long millis) { + if (millis == 0) { + return; + } + if (millis < 0) { + logger.warn("Cannot sleep for negative interval: {}."); + return; + } + logger.debug("Going sleep for: {}.", millis); + try { + Thread.sleep(millis); + } catch (InterruptedException ignore) { + } + } + + protected void sleep(Event event) { + if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) { + if (lastEventTimestamp != -1) { + long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor); + sleepSilently(timeToSleep); + } + lastEventTimestamp = event.getTimestamp(); + } + } + + @Override + public void writeMetrics(MetricsContainer container) { + synchronized (lock) { + super.writeMetrics(container); + } + } + + @Override + public void channelActive(ChannelContext context) throws Exception { + synchronized (lock) { + FlowContext flowContext = flowContext(context); + if (flowContext != null) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})", + context.getLocalAddress(), + context.getRemoteAddress()); + } + + flowContext.channelContext(context); + changeFlowState(flowContext, STATE_CONNECTED); + } + + lock.notifyAll(); + } + } + + @Override + public void channelInactive(ChannelContext context) throws Exception { + synchronized (lock) { + FlowContext flowContext = flowContext(context); + if (flowContext != null) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Channel inactive."); + } + + changeFlowState(flowContext, STATE_DISCONNECTED); + } + lock.notifyAll(); + } + } + + @Override + public void dataReceived(ChannelContext context, ByteBuff data) throws Exception { + synchronized (lock) { + FlowContext flowContext = flowContext(context); + try { + if (flowContext != null) { + ClientX client = flowContext.client(); + ClientXDataDecoder decoder = client.getResponseDecoder(flowContext); + decoder.decode(data, flowContext); + long now = timeGenerator.currentTimeMillis(); + if (flowContext.receivedStartTimestamp() == -1) { + flowContext.receivedStartTimestamp(now); + } + + if (decoder.state() == DataDecoder.STATE_ERROR) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Decoder error. " + decoder.getLastError()); + } + + decoder.clear(flowContext); + changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED); + } else if (decoder.state() == DataDecoder.STATE_FINISHED) { + Object resp = decoder.getResult(); + Object req = null; + if (flowContext.sentEvent() != null) { + req = flowContext.sentEvent().getRequest(); + } + + if (filterChain.filterInbound(req, resp, flowContext) != Filter.DENY) { + try { + fireResponseReceived(req, resp, flowContext); + } catch (Exception e) { + error(flowContext, e.getMessage(), e); + } + } + + decoder.clear(flowContext); + changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED); + } + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + debug(flowContext, e.getMessage(), e); + } + } + + lock.notifyAll(); + } + } + + @Override + public void dataWriteStart(ChannelContext context) { + synchronized (lock) { + FlowContext flowContext = flowContext(context); + if (flowContext != null && flowContext.sentEvent() != null) { + long now = timeGenerator.currentTimeMillis(); + flowContext.sendStartTimestamp(now); + flowContext.client().onDataWriteStart(flowContext); + } + } + } + + @Override + public void dataWritten(ChannelContext context) throws Exception { + synchronized (lock) { + FlowContext flowContext = flowContext(context); + if (flowContext != null && flowContext.sentEvent() != null) { + long now = timeGenerator.currentTimeMillis(); + if (collectMetric) { + synchronized (metric) { + metric.addRequestSendingTime(now - flowContext.sendStartTimestamp()); + } + } + + flowContext.client().onDataWriteEnd(flowContext); + } + + lock.notifyAll(); + + } + } + + @Override + public void errorOccurred(ChannelContext context, Throwable cause) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("Error occured. " + cause.getMessage(), cause); + } + + synchronized (lock) { + FlowContext flowContext = flowContext(context); + if (flowContext != null) { + changeFlowState(flowContext, FlowContext.STATE_ERROR); + } + + lock.notifyAll(); + } + } + + protected boolean send(FlowContext flowContext, SessionPayloadEvent event) { + synchronized (lock) { + Object req = event.getRequest(); + if (req != null) { + if (filterChain.filterOutbound(req, event.getResponse(), flowContext) == Filter.DENY) { + return false; + } + + ClientX client = flowContext.client(); + ClientXDataEncoder encoder = client.getRequestEncoder(flowContext); + ByteBuff buffer = flowContext.buffer(); + encoder.encode(req, flowContext, buffer); + + if (collectMetric) { + synchronized (metric) { + metric.incRequestsNum(); + metric.addRequestSize(flowContext.buffer().readableBytes()); + } + } + + try { + changeFlowState(flowContext, FlowContext.STATE_REQ_SENT); + flowContext.sentEvent(event); + flowContext.channelContext().writeAndFlush(buffer); + buffer.clear(); + return true; + } catch (Exception e) { + if (logger.isDebugEnabled()) { + debug(flowContext, e.getMessage(), e); + } + } + } + } + + return false; + } + + protected void processTimeouts() { + synchronized (lock) { + try { + long now = timeGenerator.currentTimeMillis(); + if (nextCheckTimeoutsTime == -1) { + nextCheckTimeoutsTime = now + checkTimeoutsPeriod; + } else if (nextCheckTimeoutsTime > now) { + nextCheckTimeoutsTime = now + checkTimeoutsPeriod; + for (FlowContext flowContext : sessions.values()) { + if (flowContext.timeouted()) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Flow for session '{}' timed out (state '{}').", + flowContext.sessionInfo(), + contextStateToString(flowContext.state())); + } + + switch (flowContext.state()) { + case FlowContext.STATE_CONNECTING: + case FlowContext.STATE_CONNECTED: + case FlowContext.STATE_REQ_SENT: + case FlowContext.STATE_ERROR: + closeSession(flowContext); + break; + case FlowContext.STATE_RESP_RECEIVED: + //Dziwny blad nie powinien wystepowac + break; + case FlowContext.STATE_DISCONNECTING: + case STATE_DISCONNECTED: + removeFlowContext(flowContext); + break; + } + } + } + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } + } + } + + protected Event eventInstanceForWorker(Event event) { + if (event instanceof SessionEvent) { + Event newEvent = ((SessionEvent) event).instanceForWorker(index); + newEvent.setTimestamp(event.getTimestamp()); + return newEvent; + } else { + return event; + } + } + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/NullFlowWorker.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,36 @@ +package com.passus.st.client; + +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.SessionInfo; + +public class NullFlowWorker extends FlowWorker { + + public NullFlowWorker(Emitter emitter, String name, int index) { + super(emitter, name, index); + } + + @Override + public boolean isWorking() { + return false; + } + + @Override + public int activeConnections() { + return 0; + } + + @Override + public void close() { + + } + + @Override + public void close(SessionInfo session) { + + } + + @Override + public void handle(Event event) { + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,304 @@ +package com.passus.st.client; + +import com.passus.commons.Assert; +import com.passus.st.emitter.ChannelContext; +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.SessionInfo; + +import java.util.*; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.Semaphore; + +public class ParallelFlowWorker extends FlowWorkerBased { + + public static final int DEFAULT_MAX_SENT_REQUESTS = 10; + + private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>(); + + private int maxSentRequests = DEFAULT_MAX_SENT_REQUESTS; + + private long eventsQueueWaitTime = 100; + + private final Semaphore semaphore = new Semaphore(maxSentRequests); + + private final Deque<LocalFlowContext> flowIndex = new ArrayDeque<>(); + + private boolean closeAllConnections = false; + + public ParallelFlowWorker(Emitter emitter, String name, int index) { + super(emitter, name, index); + } + + public int getMaxSentRequests() { + return maxSentRequests; + } + + public void setMaxSentRequests(int maxSentRequests) { + Assert.greaterThanZero(maxSentRequests, "maxSentRequests"); + this.maxSentRequests = maxSentRequests; + } + + @Override + protected LocalFlowContext flowContext(SessionInfo session) { + return (LocalFlowContext) super.flowContext(session); + } + + @Override + protected LocalFlowContext flowContext(ChannelContext context) { + return flowContext(context.getSessionInfo()); + } + + @Override + protected LocalFlowContext flowContext(SessionEvent event) { + return flowContext(event.getSessionInfo()); + } + + public long getEventsQueueWaitTime() { + return eventsQueueWaitTime; + } + + public void setEventsQueueWaitTime(long eventsQueueWaitTime) { + Assert.greaterThanZero(eventsQueueWaitTime, "eventsQueueWaitTime"); + this.eventsQueueWaitTime = eventsQueueWaitTime; + } + + @Override + protected void removeFlowContext(FlowContext flowContext) { + if (flowContext != null) { + flowIndex.remove(flowContext); + } + } + + @Override + protected LocalFlowContext createFlowContext(SessionInfo session) { + LocalFlowContext flowContext = new LocalFlowContext(session); + flowIndex.add(flowContext); + return flowContext; + } + + private void waitCloseAllConnections() { + closeAllConnections = true; + synchronized (lock) { + while (!flowIndex.isEmpty()) { + try { + lock.wait(10); + } catch (InterruptedException ignore) { + } + } + } + + closeAllConnections = false; + } + + @Override + protected boolean send(FlowContext flowContext, SessionPayloadEvent event) { + //Sprawdzamy, czy polaczen nie jest za duzo. Jezeli jest, to zamykamy + //najmniej uzywane. + if (flowIndex.size() > maxSentRequests) { + int diff = flowIndex.size() - maxSentRequests; + if (logger.isDebugEnabled()) { + debug(flowContext, "Too many connections {}.", flowIndex.size()); + } + + Iterator<LocalFlowContext> it = flowIndex.descendingIterator(); + while (it.hasNext()) { + LocalFlowContext indexFlowContext = it.next(); + if (indexFlowContext.eventsQueue.isEmpty() + && indexFlowContext.state() != FlowContext.STATE_REQ_SENT) { + close(flowContext); + if (--diff == 0) { + break; + } + } + } + } + + return super.send(flowContext, event); + } + + private boolean canSend(FlowContext flowContext) { + int state = flowContext.state(); + return (state == FlowContext.STATE_CONNECTED + || state == FlowContext.STATE_RESP_RECEIVED + || state == FlowContext.STATE_ERROR + || state == FlowContext.STATE_REQ_SENT); + } + + @Override + protected void flowStateChanged(FlowContext flowContext, int oldState) { + LocalFlowContext localFlowContext = (LocalFlowContext) flowContext; + if (oldState == FlowContext.STATE_REQ_SENT) { + if (semaphore.availablePermits() <= maxSentRequests) { + semaphore.release(); + } + } + + if (closeAllConnections) { + if (localFlowContext.state() < FlowContext.STATE_DISCONNECTING + && localFlowContext.state() != FlowContext.STATE_REQ_SENT + && localFlowContext.eventsQueue.isEmpty()) { + close(flowContext); + return; + } + } + + if (localFlowContext.state() >= FlowContext.STATE_CONNECTED + && localFlowContext.state() < FlowContext.STATE_DISCONNECTING + && localFlowContext.state() != FlowContext.STATE_REQ_SENT + && !localFlowContext.eventsQueue.isEmpty()) { + + Event event = localFlowContext.eventsQueue.peek(); + if (event.getType() == SessionStatusEvent.TYPE) { + SessionStatusEvent statusEvent = (SessionStatusEvent) event; + if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { + localFlowContext.eventsQueue.poll(); + close((SessionStatusEvent) event); + } + } else if (event.getType() == SessionPayloadEvent.TYPE + && canSend(flowContext)) { + localFlowContext.eventsQueue.poll(); + send(flowContext, (SessionPayloadEvent) event); + } else { + localFlowContext.eventsQueue.poll(); + } + } + } + + private void makeFirst(LocalFlowContext flowContext) { + synchronized (lock) { + flowIndex.remove(flowContext); + flowIndex.addFirst(flowContext); + } + } + + private void addToQueue(LocalFlowContext flowContext, Event event) { + flowContext.eventsQueue.add(event); + makeFirst(flowContext); + } + + @Override + public void handle(Event event) { + Event newEvent = null; + switch (event.getType()) { + case SessionPayloadEvent.TYPE: + semaphore.acquireUninterruptibly(); + newEvent = eventInstanceForWorker(event); + break; + case SessionStatusEvent.TYPE: + case DataEvents.DataLoopEnd.TYPE: + case DataEvents.DataEnd.TYPE: + newEvent = event; + } + + if (newEvent != null) { + synchronized (lock) { + try { + eventsQueue.put(newEvent); + lock.notifyAll(); + } catch (Exception e) { + logger.debug("Unable to add event to queue. " + e.getMessage(), e); + } + } + } + } + + private void processEvent(Event event) { + if (event instanceof SessionEvent) { + switch (event.getType()) { + case SessionStatusEvent.TYPE: + SessionStatusEvent statusEvent = (SessionStatusEvent) event; + if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { + LocalFlowContext flowContext = flowContext((SessionEvent) event); + if (flowContext != null) { + if (flowContext.eventsQueue.isEmpty() + && flowContext.state() != FlowContext.STATE_REQ_SENT) { + close(statusEvent); + } else { + addToQueue(flowContext, event); + } + } + } + break; + case SessionPayloadEvent.TYPE: { + SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event; + LocalFlowContext flowContext = flowContext(payloadEvent); + if (flowContext != null) { + if (flowContext.state() >= FlowContext.STATE_CONNECTING + && flowContext.state() < FlowContext.STATE_DISCONNECTING) { + if (flowContext.eventsQueue.isEmpty() + && (flowContext.state() == FlowContext.STATE_CONNECTED + || flowContext.state() == FlowContext.STATE_ERROR + || flowContext.state() == FlowContext.STATE_RESP_RECEIVED)) { + send(flowContext, payloadEvent); + } else { + addToQueue(flowContext, event); + } + } + } else { + try { + SessionInfo session = payloadEvent.getSessionInfo(); + flowContext = (LocalFlowContext) register(session); + addToQueue(flowContext, event); + emitter.connect(session, this, index); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + break; + } + } + + } else if (event.getType() == DataEvents.DataLoopEnd.TYPE) { + if (logger.isDebugEnabled()) { + logger.debug("DataLoopEnd received."); + } + + waitCloseAllConnections(); + filterChain.reset(); + } else if (event.getType() == DataEvents.DataEnd.TYPE) { + if (logger.isDebugEnabled()) { + logger.debug("DataEnd received. Deactivation."); + } + + working = false; + } + + } + + @Override + public void run() { + synchronized (lock) { + working = true; + while (working) { + try { + try { + lock.wait(eventsQueueWaitTime); + } catch (InterruptedException ignore) { + } + + Event event; + while ((event = eventsQueue.poll()) != null) { + processEvent(event); + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } + } + } + } + + protected static class LocalFlowContext extends FlowContext { + + private final Queue<Event> eventsQueue; + + private LocalFlowContext(SessionInfo session) { + super(session); + eventsQueue = new LinkedList<>(); + } + + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/ReqRespPair.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,20 @@ +package com.passus.st.client; + +public class ReqRespPair<R, S> { + + final R request; + final S response; + + public ReqRespPair(R request, S response) { + this.request = request; + this.response = response; + } + + public R getRequest() { + return request; + } + + public S getResponse() { + return response; + } +}
--- a/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java Wed May 29 12:32:29 2019 +0200 @@ -3,20 +3,44 @@ import com.passus.st.emitter.SessionInfo; /** - * * @author Mirosław Hawrot */ -public abstract class SessionPayloadEvent<T> extends SessionEvent { +public class SessionPayloadEvent<R, S> extends SessionEvent { - private final T payload; + public static final int TYPE = 12; - public SessionPayloadEvent(SessionInfo sessionInfo, T payload, String sourceName) { + private final R request; + + private final S response; + + private final int protocolId; + + public SessionPayloadEvent(SessionInfo sessionInfo, R request, S response, int protocolId, String sourceName) { super(sessionInfo, sourceName); - this.payload = payload; + this.request = request; + this.response = response; + this.protocolId = protocolId; } - public T getPayload() { - return payload; + @Override + public int getType() { + return TYPE; } + public int getProtocolId() { + return protocolId; + } + + public R getRequest() { + return request; + } + + public S getResponse() { + return response; + } + + @Override + public SessionPayloadEvent instanceForWorker(int index) { + return new SessionPayloadEvent(getSessionInfo(), request, response, protocolId, getSourceName()); + } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,288 @@ +package com.passus.st.client; + +import com.passus.commons.Assert; +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.SessionInfo; + +import java.io.IOException; +import java.util.concurrent.LinkedBlockingDeque; + +import static com.passus.st.client.FlowContext.contextStateToString; + +public class SynchFlowWorker extends FlowWorkerBased { + + public static final String TYPE = "synch"; + + private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>(); + + private long eventsQueueWaitTime = 100; + + /** + * Context dla ktorego wykonywana jest operacja. + */ + private FlowContext currFlowContext; + + private boolean loopEnd = false; + + private int loop = 0; + + public SynchFlowWorker(Emitter emitter, String name, int index) { + super(emitter, name, index); + } + + @Override + public boolean isWorking() { + return working; + } + + public long getEventsQueueWaitTime() { + return eventsQueueWaitTime; + } + + public void setEventsQueueWaitTime(long eventsQueueWaitTime) { + Assert.greaterThanZero(eventsQueueWaitTime, "eventsQueueWaitTime"); + this.eventsQueueWaitTime = eventsQueueWaitTime; + } + + @Override + public void sessionInvalidated(SessionInfo session) throws Exception { + synchronized (lock) { + if (logger.isDebugEnabled()) { + logger.debug("Session {} invalidated.", session); + } + + FlowContext flowContext = flowContext(session); + if (flowContext != null) { + changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING); + } + + addBlockedSession(session); + lock.notifyAll(); + } + } + + @Override + protected void flowStateChanged(FlowContext context, int oldState) { + if (logger.isDebugEnabled()) { + logger.debug("flowStateChanged {},{}", context == currFlowContext, contextStateToString(context.state())); + } + + if (context == currFlowContext) { + if (context.state() == FlowContext.STATE_CONNECTED + || context.state() == FlowContext.STATE_RESP_RECEIVED + || context.state() == FlowContext.STATE_ERROR + || context.state() == FlowContext.STATE_DISCONNECTED) { + currFlowContext = null; + } + } + } + + @Override + public void handle(Event event) { + Event newEvent = eventInstanceForWorker(event); + synchronized (lock) { + try { + eventsQueue.put(newEvent); + } catch (Exception e) { + logger.debug("Unable to add event to queue. " + e.getMessage(), e); + } + + lock.notifyAll(); + } + } + + @Override + protected void closeAllConnections() { + synchronized (lock) { + boolean wait; + do { + wait = false; + for (FlowContext flowContext : sessions.values()) { + if (flowContext.state() == FlowContext.STATE_REQ_SENT) { + wait = true; + break; + } + } + + if (wait) { + try { + lock.wait(100); + } catch (Exception e) { + } + } + } while (wait); + + super.closeAllConnections(); + while (!sessions.isEmpty()) { + try { + lock.wait(100); + } catch (Exception e) { + } + } + } + } + + /** + * Returns true if next event should be processed immediately. + * + * @return boolean + */ + private boolean pollNext() { + if (currFlowContext != null) { + return false; + } + + Event event = eventsQueue.poll(); + if (event != null) { + sleep(event); + if (logger.isTraceEnabled()) { + logger.trace("Event processing: {}", event); + } + + if (event instanceof SessionEvent) { + SessionEvent sessEvent = (SessionEvent) event; + if (isBlockedSession(sessEvent.getSessionInfo())) { + return true; + } + + if (event.getType() == SessionStatusEvent.TYPE) { + SessionStatusEvent statusEvent = (SessionStatusEvent) sessEvent; + if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { + try { + currFlowContext = register(statusEvent); + if (currFlowContext != null) { + currFlowContext.loop(loop); + emitter.connect(statusEvent.getSessionInfo(), this, index); + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + + return (currFlowContext == null); + } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { + currFlowContext = flowContext((SessionEvent) event); + if (currFlowContext != null) { + if (currFlowContext.state() != FlowContext.STATE_REQ_SENT) { + close(statusEvent); + } + } + } + + return true; + } else if (event.getType() == SessionPayloadEvent.TYPE) { + FlowContext flowContext = flowContext(sessEvent); + if (flowContext != null) { + switch (flowContext.state()) { + case FlowContext.STATE_CONNECTED: + case FlowContext.STATE_RESP_RECEIVED: + case FlowContext.STATE_ERROR: + currFlowContext = flowContext; + if (send(flowContext, (SessionPayloadEvent) event)) { + return false; + } else { + currFlowContext = null; + return true; + } + case FlowContext.STATE_DISCONNECTING: + case FlowContext.STATE_DISCONNECTED: + if (connectPartialSession) { + currFlowContext = register(sessEvent); + if (currFlowContext != null) { + try { + currFlowContext.loop(loop); + emitter.connect(sessEvent.getSessionInfo(), this, index); + } catch (IOException e) { + logger.error(e.getMessage(), e); + currFlowContext = null; + } + } + + return false; + } else { + return true; + } + default: + return false; + } + } else if (connectPartialSession) { + currFlowContext = register(sessEvent); + if (currFlowContext != null) { + try { + currFlowContext.loop(loop); + emitter.connect(sessEvent.getSessionInfo(), this, index); + eventsQueue.addFirst(sessEvent); + } catch (IOException e) { + logger.error(e.getMessage(), e); + currFlowContext = null; + } + + return false; + } else { + return true; + } + } + + return true; + } else { + return true; + } + } else if (event.getType() == DataEvents.DataLoopEnd.TYPE) { + if (logger.isDebugEnabled()) { + logger.debug("DataLoopEnd received."); + } + + loopEnd = true; + closeAllConnections(); + filterChain.reset(); + loop = currFlowContext.loop() + 1; + loopEnd = false; + return true; + } else if (event.getType() == DataEvents.DataEnd.TYPE) { + if (logger.isDebugEnabled()) { + logger.debug("DataEnd received. Deactivation."); + } + + working = false; + } + } + + return false; + } + + @Override + public void close() { + synchronized (lock) { + eventsQueue.clear(); + super.close(); + lock.notifyAll(); + } + } + + @Override + public void run() { + synchronized (lock) { + working = true; + while (working) { + try { + try { + lock.wait(eventsQueueWaitTime); + } catch (InterruptedException ignore) { + } + + boolean nextPoll; + do { + if (loopEnd || !working) { + break; + } + + nextPoll = pollNext(); + } while (nextPoll); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } + } + } + } +}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java Wed May 29 12:32:29 2019 +0200 @@ -6,7 +6,10 @@ import com.passus.net.http.HttpResponse; import com.passus.st.client.DataEvents.DataEnd; import com.passus.st.client.DataEvents.DataLoopEnd; -import com.passus.st.client.*; +import com.passus.st.client.Event; +import com.passus.st.client.FlowContext; +import com.passus.st.client.SessionEvent; +import com.passus.st.client.SessionStatusEvent; import com.passus.st.emitter.Emitter; import com.passus.st.emitter.SessionInfo; import com.passus.st.plugin.PluginConstants; @@ -20,6 +23,7 @@ /** * @author Mirosław Hawrot */ +@Deprecated @Plugin(name = HttpAsynchClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER) public class HttpAsynchClientWorker extends HttpFlowBasedClientWorker { @@ -386,15 +390,22 @@ } } - private static final class HttpResponseEvent extends SessionPayloadEvent<HttpResponse> { + private static final class HttpResponseEvent extends SessionEvent { public static final int TYPE = 1012; + private final HttpResponse payload; + public HttpResponseEvent(SessionInfo sessionInfo, String sourceName, HttpResponse payload, long timestamp) { - super(sessionInfo, payload, sourceName); + super(sessionInfo, sourceName); + this.payload = payload; setTimestamp(timestamp); } + public HttpResponse getPayload() { + return payload; + } + @Override public SessionEvent instanceForWorker(int index) { throw new UnsupportedOperationException("Not supported yet.");
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorker.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorker.java Wed May 29 12:32:29 2019 +0200 @@ -20,10 +20,10 @@ import java.util.List; /** - * * @author Mirosław Hawrot */ -public abstract class HttpClientWorker extends Thread implements EmitterHandler, MetricSource, Runnable { +@Deprecated +public abstract class HttpClientWorker extends Thread implements EmitterHandler, MetricSource { public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f; @@ -97,7 +97,7 @@ Assert.notNull(listener, "listener"); this.listeners.remove(listener); } - + @Override public boolean isCollectMetrics() { return metric != null;
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerDispatcher.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerDispatcher.java Wed May 29 12:32:29 2019 +0200 @@ -3,13 +3,12 @@ import com.passus.st.client.Event; /** - * * @author Mirosław Hawrot */ public interface HttpClientWorkerDispatcher { - - public abstract HttpClientWorker find(Event event, HttpClientWorker[] workers); - public abstract int dispatch(Event event, HttpClientWorker[] workers); - + HttpClientWorker find(Event event, HttpClientWorker[] workers); + + int dispatch(Event event, HttpClientWorker[] workers); + }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerFactory.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerFactory.java Wed May 29 12:32:29 2019 +0200 @@ -1,15 +1,12 @@ package com.passus.st.client.http; import com.passus.commons.plugin.PluginFactory; -import com.passus.st.client.http.filter.HttpFilter; -import com.passus.st.client.http.filter.HttpFilterFactory; import com.passus.st.plugin.PluginConstants; /** - * * @author Mirosław Hawrot */ -public class HttpClientWorkerFactory extends PluginFactory<HttpClientWorker> { +public class HttpClientWorkerFactory extends PluginFactory<HttpClientWorker> { private static HttpClientWorkerFactory instance;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientX.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,97 @@ +package com.passus.st.client.http; + +import com.passus.commons.time.TimeAware; +import com.passus.commons.time.TimeGenerator; +import com.passus.config.Configuration; +import com.passus.config.ConfigurationContext; +import com.passus.net.http.HttpRequest; +import com.passus.st.client.ClientX; +import com.passus.st.client.ClientXDataDecoder; +import com.passus.st.client.ClientXDataEncoder; +import com.passus.st.client.FlowContext; +import com.passus.st.metric.MetricsContainer; + +import static com.passus.st.Protocols.HTTP; +import static com.passus.st.client.http.HttpConsts.TAG_TIME_END; +import static com.passus.st.client.http.HttpConsts.TAG_TIME_START; + +public class HttpClientX implements ClientX, TimeAware { + + private final HttpClientXDataDecoder decoder; + + private final HttpClientXDataEncoder encoder; + + TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); + + boolean collectMetrics = false; + + HttpClientWorkerMetric metric; + + public HttpClientX() { + decoder = new HttpClientXDataDecoder(this); + encoder = new HttpClientXDataEncoder(); + } + + @Override + public int getProtocolId() { + return HTTP; + } + + @Override + public ClientXDataDecoder getResponseDecoder(FlowContext flowContext) { + return decoder; + } + + @Override + public ClientXDataEncoder getRequestEncoder(FlowContext flowContext) { + return encoder; + } + + @Override + public void init(FlowContext flowContext) { + //TODO Poprawic, w HttpScopes sa parametry globalne + flowContext.setParam(HttpFlowConst.PARAM_HTTP_CONTEXT, new HttpFlowContext(flowContext, new HttpScopes())); + } + + @Override + public TimeGenerator getTimeGenerator() { + return timeGenerator; + } + + @Override + public void setTimeGenerator(TimeGenerator timeGenerator) { + this.timeGenerator = timeGenerator; + } + + @Override + public void configure(Configuration config, ConfigurationContext context) { + + } + + @Override + public boolean isCollectMetrics() { + return collectMetrics; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + this.collectMetrics = collectMetrics; + } + + @Override + public void writeMetrics(MetricsContainer container) { + + } + + @Override + public void onDataWriteStart(FlowContext flowContext) { + long now = timeGenerator.currentTimeMillis(); + ((HttpRequest) flowContext.sentEvent().getRequest()).setTag(TAG_TIME_START, now); + } + + @Override + public void onDataWriteEnd(FlowContext flowContext) { + long now = timeGenerator.currentTimeMillis(); + ((HttpRequest) (flowContext.sentEvent().getRequest())).setTag(TAG_TIME_END, now); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientXDataDecoder.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,102 @@ +package com.passus.st.client.http; + +import com.passus.data.ByteBuff; +import com.passus.data.DataDecoder; +import com.passus.net.http.HttpFullMessageDecoder; +import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpResponse; +import com.passus.st.client.ClientXDataDecoder; +import com.passus.st.client.FlowContext; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import static com.passus.st.client.FlowUtils.debug; +import static com.passus.st.client.http.HttpConsts.*; +import static com.passus.st.client.http.filter.HttpFlowUtils.extractHttpContext; + +public class HttpClientXDataDecoder implements ClientXDataDecoder<HttpResponse> { + + private static final Logger LOGGER = LogManager.getLogger(HttpClientXDataDecoder.class); + + private final HttpFullMessageDecoder decoder; + + private final HttpClientX client; + + private HttpRequest lastRequest; + + public HttpClientXDataDecoder(HttpClientX client) { + this.client = client; + decoder = new HttpFullMessageDecoder(); + decoder.setDecodeRequest(false); + } + + @Override + public HttpResponse getResult() { + return (HttpResponse) decoder.getResult(); + } + + @Override + public int state() { + return decoder.state(); + } + + @Override + public String getLastError() { + return decoder.getLastError(); + } + + @Override + public void clear(FlowContext flowContext) { + if (lastRequest != null) { + extractHttpContext(flowContext).scopes().removeConversation(lastRequest); + lastRequest = null; + } + + decoder.clear(); + } + + @Override + public int decode(ByteBuff buffer, FlowContext flowContext) { + if (flowContext.sentEvent() != null) { + lastRequest = (HttpRequest) flowContext.sentEvent().getRequest(); + } + + if (lastRequest != null) { + decoder.setRequestMethod(lastRequest.getMethod()); + } + + int res = decoder.decode(buffer); + if (decoder.state() == DataDecoder.STATE_FINISHED) { + long now = client.timeGenerator.currentTimeMillis(); + HttpResponse resp = (HttpResponse) decoder.getResult(); + + if (LOGGER.isDebugEnabled()) { + debug(LOGGER, flowContext, + "Response decoded (size: {} B, downloaded: {} ms, status: {})", + decoder.getHeaderSize() + decoder.getContentSize(), + now - flowContext.receivedStartTimestamp(), + resp.getStatus().getCode() + ); + } + + if (client.isCollectMetrics()) { + synchronized (client.metric) { + client.metric.incResponsesNum(); + client.metric.addResponseStatusCode(resp.getStatus().getCode()); + client.metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize()); + client.metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp()); + if (lastRequest != null) { + client.metric.addResponseTime(now - (long) lastRequest.getTag(TAG_TIME_START)); + } + } + } + + resp.setTag(TAG_HEADER_SIZE, decoder.getHeaderSize()); + resp.setTag(TAG_CONTENT_SIZE, decoder.getContentSize()); + resp.setTag(TAG_TIME_START, flowContext.receivedStartTimestamp()); + resp.setTag(TAG_TIME_END, now); + } + + return res; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientXDataEncoder.java Wed May 29 12:32:29 2019 +0200 @@ -0,0 +1,26 @@ +package com.passus.st.client.http; + +import com.passus.data.ByteBuff; +import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpRequestEncoder; +import com.passus.st.client.ClientXDataEncoder; +import com.passus.st.client.FlowContext; + +import static com.passus.st.client.http.HttpConsts.TAG_CONTENT_SIZE; +import static com.passus.st.client.http.HttpConsts.TAG_HEADER_SIZE; + +public class HttpClientXDataEncoder implements ClientXDataEncoder<HttpRequest> { + + private final HttpRequestEncoder reqEncoder = new HttpRequestEncoder(); + + @Override + public void encode(HttpRequest request, FlowContext flowContext, ByteBuff out) { + reqEncoder.encodeHeaders(request, out); + long headerSize = out.readableBytes(); + reqEncoder.encodeContent(request, out); + + request.setTag(TAG_HEADER_SIZE, headerSize); + request.setTag(TAG_CONTENT_SIZE, flowContext.buffer().readableBytes() - headerSize); + } + +}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java Wed May 29 12:32:29 2019 +0200 @@ -21,33 +21,23 @@ import com.passus.st.metric.MetricsContainer; import org.apache.logging.log4j.Level; -import java.util.*; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import static com.passus.st.client.FlowContext.*; +import static com.passus.st.client.FlowUtils.DEFAULT_TIMEOUTS; import static com.passus.st.client.http.HttpConsts.*; import static com.passus.st.client.http.filter.HttpFlowUtils.extractHttpContext; -import static com.passus.st.client.http.filter.HttpFlowUtils.createFlowContext; /** * @author Mirosław Hawrot */ +@Deprecated public abstract class HttpFlowBasedClientWorker extends HttpClientWorker implements TimeAware { - public static final Map<Integer, Long> DEFAULT_TIMEOUTS; - - static { - Map<Integer, Long> defaultTimeouts = new HashMap<>(); - defaultTimeouts.put(FlowContext.STATE_CONNECTING, 10_000L); - defaultTimeouts.put(FlowContext.STATE_CONNECTED, 60_000L); - defaultTimeouts.put(FlowContext.STATE_REQ_SENT, 30_000L); - defaultTimeouts.put(FlowContext.STATE_RESP_RECEIVED, 60_000L); - defaultTimeouts.put(FlowContext.STATE_ERROR, 60_000L); - defaultTimeouts.put(FlowContext.STATE_DISCONNECTING, 2_000L); - defaultTimeouts.put(STATE_DISCONNECTED, 0L); - DEFAULT_TIMEOUTS = Collections.unmodifiableMap(defaultTimeouts); - } - protected final Map<SessionInfo, FlowContext> sessions = new ConcurrentHashMap<>(); private final Set<SessionInfo> blockedSessions = new HashSet<>(); @@ -538,7 +528,7 @@ } @Override - public void errorOccured(ChannelContext context, Throwable cause) throws Exception { + public void errorOccurred(ChannelContext context, Throwable cause) throws Exception { if (logger.isDebugEnabled()) { logger.debug("Error occured. " + cause.getMessage(), cause); }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowConst.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowConst.java Wed May 29 12:32:29 2019 +0200 @@ -4,4 +4,6 @@ public static final String PARAM_HTTP_CONTEXT = "http.context"; + private HttpFlowConst() { + } }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowDispatcher.java Fri May 24 12:01:51 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,9 +0,0 @@ -package com.passus.st.client.http; - -/** - * - * @author Mirosław Hawrot - */ -public interface HttpFlowDispatcher { - -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowParams.java Fri May 24 12:01:51 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,6 +0,0 @@ -package com.passus.st.client.http; - -public class HttpFlowParams { - - private HttpScopes scopes; -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpNullClientWorker.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpNullClientWorker.java Wed May 29 12:32:29 2019 +0200 @@ -9,6 +9,7 @@ * * @author Mirosław Hawrot */ +@Deprecated public class HttpNullClientWorker extends HttpClientWorker { private boolean working = true;
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java Wed May 29 12:32:29 2019 +0200 @@ -20,6 +20,7 @@ /** * @author Mirosław Hawrot */ +@Deprecated @Plugin(name = HttpParallelClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER) public final class HttpParallelClientWorker extends HttpFlowBasedClientWorker { @@ -35,7 +36,7 @@ private final Semaphore semaphore = new Semaphore(maxSentRequests); - private final Deque<LocalHttpFlowContext> flowIndex = new ArrayDeque<>(); + private final Deque<LocalFlowContext> flowIndex = new ArrayDeque<>(); private boolean closeAllConnections = false; @@ -53,17 +54,17 @@ } @Override - protected LocalHttpFlowContext flowContext(SessionInfo session) { - return (LocalHttpFlowContext) super.flowContext(session); + protected LocalFlowContext flowContext(SessionInfo session) { + return (LocalFlowContext) super.flowContext(session); } @Override - protected LocalHttpFlowContext flowContext(ChannelContext context) { + protected LocalFlowContext flowContext(ChannelContext context) { return flowContext(context.getSessionInfo()); } @Override - protected LocalHttpFlowContext flowContext(SessionEvent event) { + protected LocalFlowContext flowContext(SessionEvent event) { return flowContext(event.getSessionInfo()); } @@ -84,8 +85,8 @@ } @Override - protected LocalHttpFlowContext createFlowContext(SessionInfo session) { - LocalHttpFlowContext flowContext = new LocalHttpFlowContext(session, scopes); + protected LocalFlowContext createFlowContext(SessionInfo session) { + LocalFlowContext flowContext = new LocalFlowContext(session, scopes); flowIndex.add(flowContext); return flowContext; } @@ -114,9 +115,9 @@ debug(flowContext, "Too many connections {}.", flowIndex.size()); } - Iterator<LocalHttpFlowContext> it = flowIndex.descendingIterator(); + Iterator<LocalFlowContext> it = flowIndex.descendingIterator(); while (it.hasNext()) { - LocalHttpFlowContext indexFlowContext = it.next(); + LocalFlowContext indexFlowContext = it.next(); if (indexFlowContext.eventsQueue.isEmpty() && indexFlowContext.state() != FlowContext.STATE_REQ_SENT) { close(flowContext); @@ -140,7 +141,7 @@ @Override protected void flowStateChanged(FlowContext flowContext, int oldState) { - LocalHttpFlowContext localFlowContext = (LocalHttpFlowContext) flowContext; + LocalFlowContext localFlowContext = (LocalFlowContext) flowContext; if (oldState == FlowContext.STATE_REQ_SENT) { if (semaphore.availablePermits() <= maxSentRequests) { semaphore.release(); @@ -178,14 +179,14 @@ } } - private void makeFirst(LocalHttpFlowContext flowContext) { + private void makeFirst(LocalFlowContext flowContext) { synchronized (lock) { flowIndex.remove(flowContext); flowIndex.addFirst(flowContext); } } - private void addToQueue(LocalHttpFlowContext flowContext, Event event) { + private void addToQueue(LocalFlowContext flowContext, Event event) { flowContext.eventsQueue.add(event); makeFirst(flowContext); } @@ -222,7 +223,7 @@ case SessionStatusEvent.TYPE: SessionStatusEvent statusEvent = (SessionStatusEvent) event; if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { - LocalHttpFlowContext flowContext = flowContext((SessionEvent) event); + LocalFlowContext flowContext = flowContext((SessionEvent) event); if (flowContext != null) { if (flowContext.eventsQueue.isEmpty() && flowContext.state() != FlowContext.STATE_REQ_SENT) { @@ -235,7 +236,7 @@ break; case HttpSessionPayloadEvent.TYPE: { HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event; - LocalHttpFlowContext flowContext = flowContext(payloadEvent); + LocalFlowContext flowContext = flowContext(payloadEvent); if (flowContext != null) { if (flowContext.state() >= FlowContext.STATE_CONNECTING && flowContext.state() < FlowContext.STATE_DISCONNECTING) { @@ -251,7 +252,7 @@ } else { try { SessionInfo session = payloadEvent.getSessionInfo(); - flowContext = (LocalHttpFlowContext) register(session); + flowContext = (LocalFlowContext) register(session); addToQueue(flowContext, event); emitter.connect(session, this, index); } catch (Exception e) { @@ -304,11 +305,11 @@ } } - protected static class LocalHttpFlowContext extends FlowContext { + protected static class LocalFlowContext extends FlowContext { private final Queue<Event> eventsQueue; - private LocalHttpFlowContext(SessionInfo session, HttpScopes scopes) { + private LocalFlowContext(SessionInfo session, HttpScopes scopes) { super(session); setParam(HttpFlowConst.PARAM_HTTP_CONTEXT, new HttpFlowContext(this, scopes)); eventsQueue = new LinkedList<>();
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java Wed May 29 12:32:29 2019 +0200 @@ -3,32 +3,21 @@ import com.passus.net.http.HttpMessageHelper; import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpResponse; -import com.passus.st.client.http.HttpReqResp; import com.passus.st.client.SessionPayloadEvent; import com.passus.st.emitter.SessionInfo; +import static com.passus.st.Protocols.HTTP; + /** - * * @author Mirosław Hawrot */ -public class HttpSessionPayloadEvent extends SessionPayloadEvent<HttpReqResp> { +@Deprecated +public class HttpSessionPayloadEvent extends SessionPayloadEvent<HttpRequest, HttpResponse> { public static final int TYPE = 12; public HttpSessionPayloadEvent(SessionInfo sessionInfo, HttpRequest req, HttpResponse resp, String sourceName) { - this(sessionInfo, new HttpReqResp(req, resp), sourceName); - } - - public HttpSessionPayloadEvent(SessionInfo sessionInfo, HttpReqResp payload, String sourceName) { - super(sessionInfo, payload, sourceName); - } - - public HttpRequest getRequest() { - return getPayload().getRequest(); - } - - public HttpResponse getResponse() { - return getPayload().getResponse(); + super(sessionInfo, req, resp, HTTP, sourceName); } @Override @@ -37,13 +26,6 @@ } @Override - public HttpSessionPayloadEvent instanceForWorker(int index) { - HttpReqResp payload = getPayload(); - HttpRequest reqCopy = payload.request == null ? null : new HttpRequest(payload.request); - return new HttpSessionPayloadEvent(getSessionInfo(), reqCopy, payload.response, getSourceName()); - } - - @Override public String toString() { String req = null; String resp = null;
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpStresserWorker.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpStresserWorker.java Wed May 29 12:32:29 2019 +0200 @@ -11,6 +11,7 @@ * * @author Mirosław Hawrot */ +@Deprecated public class HttpStresserWorker extends HttpClientWorker { private int maxRequests = -1;
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java Wed May 29 12:32:29 2019 +0200 @@ -20,6 +20,7 @@ /** * @author Mirosław Hawrot */ +@Deprecated @Plugin(name = HttpSynchClientWorker.TYPE, category = PluginConstants.CATEGORY_HTTP_CLIENT_WORKER) public class HttpSynchClientWorker extends HttpFlowBasedClientWorker {
--- a/stress-tester/src/main/java/com/passus/st/client/http/extractor/YamlExtractor.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/extractor/YamlExtractor.java Wed May 29 12:32:29 2019 +0200 @@ -1,15 +1,14 @@ package com.passus.st.client.http.extractor; +import org.yaml.snakeyaml.Yaml; + import java.io.IOException; import java.util.ArrayList; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.yaml.snakeyaml.Yaml; - /** * @author norbert.rostkowski */ @@ -40,7 +39,6 @@ } } - @Override public CharSequence extract(CharSequence content) throws IOException { Yaml yaml = new Yaml();
--- a/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpCsrfFormFilter.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpCsrfFormFilter.java Wed May 29 12:32:29 2019 +0200 @@ -100,7 +100,7 @@ LOGGER.debug("Could not find request parameter {}", parameters); } } catch (IOException ex) { - LOGGER.debug("Could not decode request."); + LOGGER.debug("Could not decodeResponse request."); } } }
--- a/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpMessagePredicate.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpMessagePredicate.java Wed May 29 12:32:29 2019 +0200 @@ -7,13 +7,12 @@ import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpResponse; import com.passus.st.client.FlowContext; -import com.passus.st.client.http.HttpFlowContext; import com.passus.st.filter.Transformers; + import java.io.IOException; import java.util.function.Predicate; /** - * * @author Mirosław Hawrot */ public class HttpMessagePredicate implements Predicate<HttpMessage> {
--- a/stress-tester/src/main/java/com/passus/st/emitter/EmitterHandler.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/EmitterHandler.java Wed May 29 12:32:29 2019 +0200 @@ -3,43 +3,42 @@ import com.passus.data.ByteBuff; /** - * * @author Mirosław Hawrot */ public interface EmitterHandler { - public default void channelRegistered(ChannelContext context) throws Exception { - - } - - public default void channelUnregistered(ChannelContext context) throws Exception { - - } - - public default void channelActive(ChannelContext context) throws Exception { - - } - - public default void channelInactive(ChannelContext context) throws Exception { + default void channelRegistered(ChannelContext context) throws Exception { } - public default void dataReceived(ChannelContext context, ByteBuff data) throws Exception { - } - - public default void dataWriteStart(ChannelContext context) { + default void channelUnregistered(ChannelContext context) throws Exception { } - public default void dataWritten(ChannelContext context) throws Exception { + default void channelActive(ChannelContext context) throws Exception { } - public default void sessionInvalidated(SessionInfo session) throws Exception { + default void channelInactive(ChannelContext context) throws Exception { } - public default void errorOccured(ChannelContext context, Throwable cause) throws Exception { + default void dataReceived(ChannelContext context, ByteBuff data) throws Exception { + } + + default void dataWriteStart(ChannelContext context) { + + } + + default void dataWritten(ChannelContext context) throws Exception { + + } + + default void sessionInvalidated(SessionInfo session) throws Exception { + + } + + default void errorOccurred(ChannelContext context, Throwable cause) throws Exception { }
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Wed May 29 12:32:29 2019 +0200 @@ -9,6 +9,8 @@ import java.text.ParseException; import java.util.Objects; +import static com.passus.st.Protocols.UNKNOWN; + /** * @author Mirosław Hawrot */ @@ -20,6 +22,8 @@ private final int transport; + private final int protocolId; + private final IpAddress srcIp; private final int srcPort; @@ -46,6 +50,10 @@ this(srcSocket.getIp(), srcSocket.getPort(), dstSocket.getIp(), dstSocket.getPort(), transport); } + public SessionInfo(SocketAddress srcSocket, SocketAddress dstSocket, int transport, int protocolId) { + this(srcSocket.getIp(), srcSocket.getPort(), dstSocket.getIp(), dstSocket.getPort(), transport, protocolId); + } + public SessionInfo(String srcIp, int srcPort, String dstIp, int dstPort) { this(IpAddress.parse(srcIp), srcPort, IpAddress.parse(dstIp), dstPort); } @@ -54,13 +62,20 @@ this(srcIp, srcPort, dstIp, dstPort, DEFAULT_TRANSPORT); } + public SessionInfo(IpAddress srcIp, int srcPort, IpAddress dstIp, int dstPort, int transport) { - this(srcIp, srcPort, dstIp, dstPort, transport, UniqueIdGenerator.generate()); + this(srcIp, srcPort, dstIp, dstPort, transport, UNKNOWN); } - public SessionInfo(IpAddress srcIp, int srcPort, IpAddress dstIp, int dstPort, int transport, String sessionId) { + public SessionInfo(IpAddress srcIp, int srcPort, IpAddress dstIp, int dstPort, int transport, int protocolId) { + this(srcIp, srcPort, dstIp, dstPort, transport, protocolId, UniqueIdGenerator.generate()); + } + + public SessionInfo(IpAddress srcIp, int srcPort, IpAddress dstIp, int dstPort, + int transport, int protocolId, String sessionId) { this.sessionId = sessionId; this.transport = transport; + this.protocolId = protocolId; this.srcPort = srcPort; this.dstPort = dstPort; this.srcIp = srcIp; @@ -81,6 +96,10 @@ return transport; } + public int getProtocolId() { + return protocolId; + } + public String getSessionId() { return sessionId; }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDefaultEmitterWorker.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDefaultEmitterWorker.java Wed May 29 12:32:29 2019 +0200 @@ -317,7 +317,7 @@ KeyContext keyContext = (KeyContext) key.attachment(); try { - keyContext.handler.errorOccured(keyContext.channelContext, cause); + keyContext.handler.errorOccurred(keyContext.channelContext, cause); } catch (Exception e) { logger.debug(e.getMessage(), e); }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker2.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker2.java Wed May 29 12:32:29 2019 +0200 @@ -428,7 +428,7 @@ KeyContext keyContext = (KeyContext) key.attachment(); try { - keyContext.handler.errorOccured(keyContext.channelContext, cause); + keyContext.handler.errorOccurred(keyContext.channelContext, cause); } catch (Exception e) { logger.debug(e.getMessage(), e); }
--- a/stress-tester/src/main/java/com/passus/st/job/TestJob.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/job/TestJob.java Wed May 29 12:32:29 2019 +0200 @@ -205,12 +205,12 @@ } }); - Map<String, ValueExtractor> appVars = (Map<String, ValueExtractor>) context.get( + Map<String, ValueExtractor> appVars = context.get( ConfigurationContextConsts.APP_VARS); if (appVars != null) { HttpVarsFilter httpVarsFilter = new HttpVarsFilter(appVars); - List<HttpFilter> filters = (List<HttpFilter>) context.get( + List<HttpFilter> filters = context.get( ConfigurationContextConsts.HTTP_FILTERS); if (filters != null) { filters.add(httpVarsFilter);
--- a/stress-tester/src/main/java/com/passus/st/metric/MetricSource.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/metric/MetricSource.java Wed May 29 12:32:29 2019 +0200 @@ -1,15 +1,14 @@ package com.passus.st.metric; /** - * * @author Mirosław Hawrot */ public interface MetricSource { - public boolean isCollectMetrics(); + boolean isCollectMetrics(); - public void setCollectMetrics(boolean collectMetrics); + void setCollectMetrics(boolean collectMetrics); - public void writeMetrics(MetricsContainer container); + void writeMetrics(MetricsContainer container); }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java Wed May 29 12:32:29 2019 +0200 @@ -4,22 +4,16 @@ import com.passus.data.ByteBuff; import com.passus.data.DataSource; import com.passus.data.HeapByteBuff; -import com.passus.net.http.HttpConsts; -import com.passus.net.http.HttpHeaderEntry; -import com.passus.net.http.HttpHeaders; -import com.passus.net.http.HttpMessage; -import com.passus.net.http.HttpMethod; -import com.passus.net.http.HttpRequest; -import com.passus.net.http.HttpResponse; -import com.passus.st.client.http.HttpReqResp; +import com.passus.net.http.*; import com.passus.st.client.http.HttpSessionPayloadEvent; import com.passus.st.emitter.SessionInfo; + +import java.io.IOException; + import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_REQUEST; import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_RESPONSE; -import java.io.IOException; /** - * * @author Mirosław Hawrot */ public class HttpSessionPayloadEventDataWriter { @@ -153,29 +147,11 @@ return size; } - public void encodeFullMessage(long timestamp, SessionInfo session, HttpMessage msg, NcDataBlockWriter writer) throws IOException { - ByteBuff buffer = new HeapByteBuff(); - long size = encodeMessage(msg, buffer); - DataSource content = msg.getContent(); - if (content != null) { - size += content.available(); - size += ncDataHelper.writeLongVLC(buffer, content.available()); - } - writer.writeSessionPayloadHeader(timestamp, session, (byte) 1); - writer.writeSessionPayloadData(buffer); - if (content != null) { - writer.writeSessionPayloadData(content); - } - - writer.closeSessionPayloadBlock(); - } - - public void encodeFullMessages(long timestamp, SessionInfo session, HttpReqResp messages, NcDataBlockWriter writer) throws IOException { + public void encodeFullMessages(long timestamp, SessionInfo session, HttpRequest req, HttpResponse resp, NcDataBlockWriter writer) throws IOException { ByteBuff reqBuffer = new HeapByteBuff(); ByteBuff respBuffer = new HeapByteBuff(); byte flags = 0; - HttpRequest req = messages.getRequest(); DataSource reqContent = null; if (reqWriteMode.code() > HttpWriteMode.SKIP.code() && req != null) { @@ -195,7 +171,6 @@ } } - HttpResponse resp = messages.getResponse(); DataSource respContent = null; if (respWriteMode.code() > HttpWriteMode.SKIP.code() && resp != null) { @@ -233,7 +208,7 @@ public void write(HttpSessionPayloadEvent event, NcDataBlockWriter writer) throws IOException { long time = event.getTimestamp(); SessionInfo session = event.getSessionInfo(); - encodeFullMessages(time, session, event.getPayload(), writer); + encodeFullMessages(time, session, event.getRequest(), event.getResponse(), writer); } }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/option/DefaultValueCoderResolver.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/option/DefaultValueCoderResolver.java Wed May 29 12:32:29 2019 +0200 @@ -62,7 +62,7 @@ return code; } - throw new IllegalArgumentException("Cannot encode object of type: " + cls.getSimpleName()); + throw new IllegalArgumentException("Cannot encodeRequest object of type: " + cls.getSimpleName()); } @Override
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java Wed May 29 12:32:29 2019 +0200 @@ -10,14 +10,17 @@ import com.passus.config.schema.NodeDefinition; import com.passus.config.schema.NodeDefinitionCreator; import com.passus.config.validation.LongValidator; -import com.passus.data.*; +import com.passus.data.ByteBuff; +import com.passus.data.ByteBuffAllocator; +import com.passus.data.ByteBuffDataSource; +import com.passus.data.DefaultByteBuffAllocator; import com.passus.st.client.DataEvents; import com.passus.st.client.DataEvents.DataEnd; import com.passus.st.client.DataEvents.DataLoopEnd; import com.passus.st.client.EventHandler; +import com.passus.st.client.SessionPayloadEvent; import com.passus.st.client.SessionStatusEvent; import com.passus.st.client.http.HttpReqResp; -import com.passus.st.client.http.HttpSessionPayloadEvent; import com.passus.st.emitter.SessionInfo; import com.passus.st.plugin.PluginConstants; import com.passus.st.reader.nc.HttpSessionPayloadEventDataReader; @@ -31,6 +34,7 @@ import java.io.IOException; import static com.passus.config.schema.ConfigurationSchemaBuilder.*; +import static com.passus.st.Protocols.HTTP; /** * @author Mirosław Hawrot @@ -237,7 +241,7 @@ sessionInfo.setSourceName(getName()); ByteBuff payload = readPayload(payloadBlock.data()); HttpReqResp messages = httpReader.decodeMessages(payload); - handler.handle(new HttpSessionPayloadEvent(sessionInfo, messages, getName())); + handler.handle(new SessionPayloadEvent(sessionInfo, messages.getRequest(), messages.getResponse(), HTTP, getName())); break; default: reader.read();
--- a/stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java Wed May 29 12:32:29 2019 +0200 @@ -8,8 +8,8 @@ import com.passus.net.session.SessionKey; import com.passus.st.client.Event; import com.passus.st.client.EventHandler; +import com.passus.st.client.SessionPayloadEvent; import com.passus.st.client.SessionStatusEvent; -import com.passus.st.client.http.HttpSessionPayloadEvent; import com.passus.st.emitter.SessionInfo; import java.util.HashMap; @@ -17,6 +17,7 @@ import static com.passus.net.session.TcpSessionProcessorConstants.STATUS_CLOSE; import static com.passus.net.session.TcpSessionProcessorConstants.STATUS_ESTABLISHED; +import static com.passus.st.Protocols.HTTP; public class PcapHttpListener implements HttpSessionListener { @@ -31,7 +32,7 @@ private final PcapSessionEventSourceMetric metric; public PcapHttpListener(String sourceName, int maxSessionNum, EventHandler eventHandler, - boolean collectMetric, PcapSessionEventSourceMetric metric) { + boolean collectMetric, PcapSessionEventSourceMetric metric) { this.sourceName = sourceName; lastRequests = new HashMap<>(maxSessionNum); this.eventHandler = eventHandler; @@ -43,10 +44,10 @@ SessionInfo info = new SessionInfo( context.getSrcIpAddr(), context.getSrcPort(), context.getDstIpAddr(), context.getDstPort(), - context.getProtocol(), context.getId()); + HTTP, context.getProtocol(), context.getId()); info.setSourceName(sourceName); - Event event = new HttpSessionPayloadEvent(info, req, resp, sourceName); + Event event = new SessionPayloadEvent(info, req, resp, HTTP, sourceName); event.setTimestamp(timestamp); eventHandler.handle(event); if (collectMetric) { @@ -84,7 +85,7 @@ SessionInfo info = new SessionInfo( context.getSrcIpAddr(), context.getSrcPort(), context.getDstIpAddr(), context.getDstPort(), - context.getProtocol(), context.getId()); + HTTP, context.getProtocol(), context.getId()); info.setSourceName(sourceName); Event event = new SessionStatusEvent(info, sessionInfoStatus, sourceName);
--- a/stress-tester/src/test/java/com/passus/st/PcapScannerTest.java Fri May 24 12:01:51 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,41 +0,0 @@ -package com.passus.st; - -import com.passus.net.http.HttpHeaders; -import org.apache.logging.log4j.Level; -import org.testng.annotations.Test; - -/** - * - * @author mikolaj.podbielski - */ -public class PcapScannerTest { - - public static void main(String[] args) { - String ST = "C:\\Users\\mikolaj.podbielski\\Desktop\\hg\\stress-tester\\stress-tester\\"; - PcapScanner.main(ST + "ndiag.pcap"); -// scanAll(); - } - - public static void scanAll() { - try { - Log4jConfigurationFactory.enableFactory(Level.INFO); - PcapScanner.Extractor extractor = (event) -> event.getResponse().getHeaders().get(HttpHeaders.SET_COOKIE); - PcapScanner scanner = new PcapScanner(extractor); - scanAll(scanner); - System.out.println("\n --== Scanned values:"); - scanner.getValues().forEach(System.out::println); - } catch (Exception e) { - e.printStackTrace(System.out); - } - } - - private static void scanAll(PcapScanner scanner) throws InterruptedException { - String ST = "C:\\Users\\mikolaj.podbielski\\Desktop\\hg\\stress-tester\\stress-tester\\"; - scanner.scan(ST + "ndiag.pcap", 8080); - scanner.scan(ST + "netim.pcap", 9190); - scanner.scan(ST + "arx.pcap", 80); - scanner.scan(ST + "http5b1.pcap", 80); - scanner.scan(ST + "basic_digest.pcap", 80); - } - -}
--- a/stress-tester/src/test/java/com/passus/st/client/TestClientHandler.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/TestClientHandler.java Wed May 29 12:32:29 2019 +0200 @@ -71,7 +71,7 @@ } @Override - public final void errorOccured(ChannelContext context, Throwable cause) throws Exception { + public final void errorOccurred(ChannelContext context, Throwable cause) throws Exception { ClientEvent event = ClientEvent.create(EventType.ERROR_OCCURED, context); event.setCause(cause); add(event);
--- a/stress-tester/src/test/java/com/passus/st/utils/Assert.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/utils/Assert.java Wed May 29 12:32:29 2019 +0200 @@ -4,23 +4,22 @@ import com.passus.data.ByteStringImpl; import com.passus.data.SliceByteString; import com.passus.st.client.Event; -import com.passus.st.client.TestHttpClientListener; +import com.passus.st.client.SessionPayloadEvent; import com.passus.st.client.TestHttpClientListener.HttpClientEvent; import com.passus.st.client.TestHttpClientListener.HttpClientEventType; import com.passus.st.client.TestHttpClientListener.ResponseReceivedEvent; -import com.passus.st.client.http.HttpSessionPayloadEvent; -import com.passus.st.source.NcEventSourceTest; -import static com.passus.st.utils.HttpMessageAssert.assertMessages; -import static com.passus.st.utils.HttpMessageAssert.assertMessagesContent; +import org.testng.AssertJUnit; + import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.stream.Collectors; -import org.testng.AssertJUnit; -import static org.testng.AssertJUnit.assertEquals; + +import static com.passus.st.Protocols.HTTP; +import static com.passus.st.utils.HttpMessageAssert.assertMessages; +import static com.passus.st.utils.HttpMessageAssert.assertMessagesContent; /** - * * @author Mirosław Hawrot */ public class Assert extends AssertJUnit { @@ -42,14 +41,14 @@ public static void assertHttpClientEvents(List<Event> expectedEvents, Collection<HttpClientEvent> httpClientEvents) { expectedEvents = expectedEvents.stream() - .filter((e) -> e.getType() == HttpSessionPayloadEvent.TYPE) + .filter((e) -> e.getType() == SessionPayloadEvent.TYPE && ((SessionPayloadEvent) e).getProtocolId() == HTTP) .collect(Collectors.toList()); List<Event> events = new ArrayList<>(httpClientEvents.size()); httpClientEvents.forEach((event) -> { if (event.getType() == HttpClientEventType.RESPONSE_RECEIVED) { ResponseReceivedEvent respReceived = (ResponseReceivedEvent) event; - HttpSessionPayloadEvent payloadEvent = new HttpSessionPayloadEvent(null, respReceived.getRequest(), respReceived.getResponse(), null); + SessionPayloadEvent payloadEvent = new SessionPayloadEvent(null, respReceived.getRequest(), respReceived.getResponse(), HTTP, null); events.add(payloadEvent); } }); @@ -64,9 +63,9 @@ Event event = events.get(i); assertEquals(expectedEvent.getType(), event.getType()); - if (event.getType() == HttpSessionPayloadEvent.TYPE) { - HttpSessionPayloadEvent expectedPayloadEvent = (HttpSessionPayloadEvent) expectedEvent; - HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event; + if (event.getType() == SessionPayloadEvent.TYPE && ((SessionPayloadEvent) event).getProtocolId() == HTTP) { + SessionPayloadEvent expectedPayloadEvent = (SessionPayloadEvent) expectedEvent; + SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event; assertMessages(expectedPayloadEvent.getRequest(), payloadEvent.getRequest()); assertMessagesContent(expectedPayloadEvent.getRequest(), payloadEvent.getRequest());
--- a/stress-tester/src/test/java/com/passus/st/utils/HttpMessageAssert.java Fri May 24 12:01:51 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/utils/HttpMessageAssert.java Wed May 29 12:32:29 2019 +0200 @@ -1,6 +1,5 @@ package com.passus.st.utils; -import static com.passus.st.utils.Assert.*; import com.passus.data.ByteBuff; import com.passus.data.ByteString; import com.passus.data.DataSourceUtils; @@ -8,11 +7,14 @@ import com.passus.net.http.HttpMessage; import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpResponse; + import java.util.List; import java.util.Set; +import static com.passus.st.utils.Assert.assertEquals; +import static com.passus.st.utils.Assert.fail; + /** - * * @author Mirosław Hawrot */ public class HttpMessageAssert { @@ -20,6 +22,10 @@ private HttpMessageAssert() { } + public static void assertMessages(Object expectedMsg, Object msg) { + assertMessages((HttpMessage) expectedMsg, (HttpMessage) msg); + } + public static void assertMessages(HttpMessage expectedMsg, HttpMessage msg) { if (expectedMsg == msg) { return; @@ -33,6 +39,10 @@ } } + public static void assertMessagesContent(Object expectedMsg, Object msg) { + assertMessagesContent((HttpMessage) expectedMsg, (HttpMessage) msg); + } + public static void assertMessagesContent(HttpMessage expectedMsg, HttpMessage msg) { if (expectedMsg == msg) { return;