Mercurial > stress-tester
changeset 1238:457d050a95f6
FlowExecutor.maxSleepTime, PcapSessionEventSource.skipResponses
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java Tue Jun 30 10:56:04 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java Tue Jun 30 12:02:17 2020 +0200 @@ -24,8 +24,7 @@ import java.util.*; import static com.passus.config.schema.ConfigurationSchemaBuilder.*; -import static com.passus.st.config.CommonNodeDefs.BOOLEAN_DEF; -import static com.passus.st.config.CommonNodeDefs.INT_GREATER_THAN_ZERO_DEF; +import static com.passus.st.config.CommonNodeDefs.*; public class FlowExecutor implements EventHandler, MetricSource, Service, Configurable { @@ -36,6 +35,7 @@ private static final boolean DEFAULT_COLLECT_METRICS = false; public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f; + public static final long MAX_SLEEP_TIME = 0; private static final Logger LOGGER = LogManager.getLogger(FlowExecutor.class); @@ -53,6 +53,8 @@ private float sleepFactor = SLEEP_FACTOR_NO_SLEEP; + private long maxSleepTime = MAX_SLEEP_TIME; + private String workerType = "synch"; private int workersNum = DEFAULT_WORKERS_NUM; @@ -125,6 +127,14 @@ this.sleepFactor = sleepFactor; } + public long getMaxSleepTime() { + return maxSleepTime; + } + + public void setMaxSleepTime(long maxSleepTime) { + this.maxSleepTime = maxSleepTime; + } + public FlowWorkerDispatcher getDispatcher() { return dispatcher; } @@ -196,6 +206,7 @@ collectMetrics = config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS); dispatcher = config.get("dispatcher", null); sleepFactor = config.getFloat("sleepFactor", SLEEP_FACTOR_NO_SLEEP); + maxSleepTime = config.getLong("maxSleepTime", MAX_SLEEP_TIME); } @Override @@ -257,9 +268,13 @@ worker.setFilterChain(filterChain.instanceForWorker(i)); if (worker instanceof SynchFlowWorker) { - ((SynchFlowWorker) worker).setSleepFactor(sleepFactor); + SynchFlowWorker synchWorker = (SynchFlowWorker) worker; + synchWorker.setSleepFactor(sleepFactor); + synchWorker.setMaxSleepTime(maxSleepTime); } else if (worker instanceof ParallelFlowWorker) { - ((ParallelFlowWorker) worker).setSleepFactor(sleepFactor); + ParallelFlowWorker parallelWorker = (ParallelFlowWorker) worker; + parallelWorker.setSleepFactor(sleepFactor); + parallelWorker.setMaxSleepTime(maxSleepTime); } worker.setCollectMetrics(collectMetrics); @@ -367,6 +382,7 @@ tupleDef("collectMetrics", BOOLEAN_DEF).setRequired(false), tupleDef("workers", INT_GREATER_THAN_ZERO_DEF).setRequired(false), tupleDef("sleepFactor", valueDefFloat()).setRequired(false), + tupleDef("maxSleepTime", LONG_DEF).setRequired(false), tupleDef("dispatcher", valueDef() .addValidator(new EnumValidator(DISPATCHERS, false)) .setTransformer(FlowExecutor.DispatcherTransformer.INSTANCE)
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Tue Jun 30 10:56:04 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Tue Jun 30 12:02:17 2020 +0200 @@ -17,6 +17,7 @@ import java.util.concurrent.LinkedBlockingDeque; import static com.passus.st.client.FlowContext.STATE_CONNECTED; +import static com.passus.st.client.FlowExecutor.MAX_SLEEP_TIME; import static com.passus.st.client.FlowExecutor.SLEEP_FACTOR_NO_SLEEP; import static com.passus.st.client.FlowUtils.*; @@ -42,6 +43,8 @@ private float sleepFactor = SLEEP_FACTOR_NO_SLEEP; + private long maxSleepTime = MAX_SLEEP_TIME; + public ParallelFlowWorker(Emitter emitter, String name, int index) { super(emitter, name, index); } @@ -72,6 +75,14 @@ this.sleepFactor = sleepFactor; } + public long getMaxSleepTime() { + return maxSleepTime; + } + + public void setMaxSleepTime(long maxSleepTime) { + this.maxSleepTime = maxSleepTime; + } + @Override public boolean isWorking() { return working; @@ -128,7 +139,7 @@ flowHandler.init(flowContext); flowContext.client(flowHandler); - FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue, sleepFactor); + FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue, sleepFactor, maxSleepTime); flowThread.start(); sessions.put(session, flowThread); return flowThread; @@ -265,16 +276,19 @@ private final float sleepFactor; + private final long maxSleepTime; + private long lastEventTimestamp = -1; private final BlockingQueue<Event> queue; - private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize, float sleepFactor) { + private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize, float sleepFactor, long maxSleepTime) { this.emitter = emitter; this.flowContext = flowContext; flowProcessor = new FlowProcessor(this, logger, index); this.queue = new ArrayBlockingQueue<>(queueSize); this.sleepFactor = sleepFactor; + this.maxSleepTime = maxSleepTime; } public FlowContext flowContext() { @@ -376,6 +390,10 @@ if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) { if (lastEventTimestamp != -1) { long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor); + if (maxSleepTime > 0 && timeToSleep > maxSleepTime) { + timeToSleep = maxSleepTime; + } + sleepSilently(timeToSleep); } lastEventTimestamp = event.getTimestamp();
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Tue Jun 30 10:56:04 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Tue Jun 30 12:02:17 2020 +0200 @@ -17,6 +17,7 @@ import java.util.concurrent.LinkedBlockingDeque; import static com.passus.st.client.FlowContext.*; +import static com.passus.st.client.FlowExecutor.MAX_SLEEP_TIME; import static com.passus.st.client.FlowExecutor.SLEEP_FACTOR_NO_SLEEP; import static com.passus.st.client.FlowUtils.*; @@ -47,6 +48,8 @@ private long lastEventTimestamp = -1; + private long maxSleepTime = MAX_SLEEP_TIME; + public SynchFlowWorker(Emitter emitter, String name, int index) { super(emitter, name, index); } @@ -73,6 +76,14 @@ this.sleepFactor = sleepFactor; } + public long getMaxSleepTime() { + return maxSleepTime; + } + + public void setMaxSleepTime(long maxSleepTime) { + this.maxSleepTime = maxSleepTime; + } + @Override public int activeConnections() { int count = 0; @@ -259,6 +270,10 @@ if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) { if (lastEventTimestamp != -1) { long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor); + if (maxSleepTime > 0 && timeToSleep > maxSleepTime) { + timeToSleep = maxSleepTime; + } + sleepSilently(timeToSleep); } lastEventTimestamp = event.getTimestamp();
--- a/stress-tester/src/main/java/com/passus/st/config/CommonNodeDefs.java Tue Jun 30 10:56:04 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/config/CommonNodeDefs.java Tue Jun 30 12:02:17 2020 +0200 @@ -20,6 +20,8 @@ public static NodeDefinition STRING_LIST_DEF = listDef(STRING_DEF); + public static NodeDefinition LONG_DEF = valueDefLong(); + public static NodeDefinition PORT_DEF = valueDefInteger().addValidator(PortValidator.INSTANCE); public static NodeDefinition INT_GREATER_THAN_ZERO_DEF = valueDefInteger().addValidator(LongValidator.GREATER_ZERO);
--- a/stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java Tue Jun 30 10:56:04 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java Tue Jun 30 12:02:17 2020 +0200 @@ -15,37 +15,49 @@ private final Map<SessionInfo, HttpRequest> lastRequests; + private final boolean skipResponses; + public PcapHttpListener(String sourceName, EventHandler eventHandler, - boolean collectMetric, PcapSessionEventSourceMetric metric, int maxSessionNum) { + boolean collectMetric, PcapSessionEventSourceMetric metric, int maxSessionNum, + boolean skipResponses) { super(sourceName, eventHandler, collectMetric, metric, HTTP); - lastRequests = new HashMap<>(maxSessionNum); + this.skipResponses = skipResponses; + this.lastRequests = skipResponses ? null : new HashMap<>(maxSessionNum); } @Override public void onMessageReceived(SessionContext context, HttpMessage msg, long timestamp) { SessionInfo session = SessionInfo.create(context, protocolId); - if (msg.isRequest()) { - HttpRequest lastRequest = lastRequests.put(session, (HttpRequest) msg); - if (lastRequest != null) { - firePayloadEvent(lastRequest, null, session, timestamp); + if (skipResponses) { + if (msg.isRequest()) { + firePayloadEvent(msg, null, session, timestamp); } } else { - HttpRequest req = lastRequests.remove(session); - firePayloadEvent(req, msg, session, timestamp); + if (msg.isRequest()) { + HttpRequest lastRequest = lastRequests.put(session, (HttpRequest) msg); + if (lastRequest != null) { + firePayloadEvent(lastRequest, null, session, timestamp); + } + } else { + HttpRequest req = lastRequests.remove(session); + firePayloadEvent(req, msg, session, timestamp); + } } } @Override protected void onSessionClosed(SessionContext context, int status, long timestamp) { SessionInfo session = SessionInfo.create(context, protocolId); - HttpRequest req = lastRequests.remove(session); - if (req != null) { - firePayloadEvent(req, null, session, timestamp); + if (!skipResponses) { + HttpRequest req = lastRequests.remove(session); + if (req != null) { + firePayloadEvent(req, null, session, timestamp); + } } } public void onLoopEnd(long timestamp) { - if (!lastRequests.isEmpty()) { + if (!skipResponses && !lastRequests.isEmpty()) { lastRequests.forEach((session, req) -> { firePayloadEvent(req, null, session, timestamp); });
--- a/stress-tester/src/main/java/com/passus/st/source/PcapHttpSessionAnalyzerHook.java Tue Jun 30 10:56:04 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapHttpSessionAnalyzerHook.java Tue Jun 30 12:02:17 2020 +0200 @@ -18,7 +18,8 @@ context.getEventHandler(), context.isCollectMetric(), context.getMetric(), - context.getTcpProcessor().getMaxSessionNumber()); + context.getTcpProcessor().getMaxSessionNumber(), + skipResponses); analyzer.setListener(listener); tcpProc.addAnalyzer(analyzer);
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionAnalyzerHook.java Tue Jun 30 10:56:04 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionAnalyzerHook.java Tue Jun 30 12:02:17 2020 +0200 @@ -5,6 +5,16 @@ public abstract class PcapSessionAnalyzerHook { + protected boolean skipResponses; + + public boolean isSkipResponses() { + return skipResponses; + } + + public void setSkipResponses(boolean skipResponses) { + this.skipResponses = skipResponses; + } + public boolean supports(SessionAnalyzer obj) { Assert.notNull(obj, "object"); return supports(obj.getClass());
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java Tue Jun 30 10:56:04 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java Tue Jun 30 12:02:17 2020 +0200 @@ -85,6 +85,8 @@ private List<SessionAnalyzer> analyzers = new ArrayList<>(); + private boolean skipResponses; + public PcapSessionEventSource() { this.name = UniqueIdGenerator.generate(); @@ -216,6 +218,7 @@ pcapFile = config.getString("fileName"); loops = config.getInteger("loops", EventSource.DEFAULT_LOOPS); loopDelay = config.getLong("loopDelay", 0L); + skipResponses = config.getBoolean("skipResponses", false); setCollectMetrics(config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS)); @@ -253,6 +256,10 @@ return; } + hooks.forEach((h) -> { + h.setSkipResponses(skipResponses); + }); + if (handler == null) { throw new IllegalStateException("Handler cannot be null."); } else if (pcapFile == null) { @@ -571,6 +578,7 @@ tupleDef("loopDelay", LONG_GREATER_EQUAL_ZERO_DEF).setRequired(false), tupleDef("sessionProc", sessionProcDef).setRequired(false), tupleDef("collectMetrics", BOOLEAN_DEF).setRequired(false), + tupleDef("skipResponses", BOOLEAN_DEF).setRequired(false), tupleDef("analyzers", dynaKeyValueVaryListDef("type", pf).setTransformToPluginObject(true) ).setRequired(false)