Mercurial > stress-tester
changeset 1287:c04b5c168a9c
PcapSessionEventSource.orderQueueTimeout
author | Devel 2 |
---|---|
date | Fri, 10 Jul 2020 13:44:01 +0200 |
parents | 356c10f8b486 |
children | ded8813add01 |
files | stress-tester/src/main/java/com/passus/st/source/FlushableEventHandler.java stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java stress-tester/src/main/java/com/passus/st/source/TimeWindowEventHandler.java |
diffstat | 3 files changed, 56 insertions(+), 6 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/source/FlushableEventHandler.java Fri Jul 10 13:44:01 2020 +0200 @@ -0,0 +1,9 @@ +package com.passus.st.source; + +import com.passus.st.client.EventHandler; + +public interface FlushableEventHandler extends EventHandler { + + public void flush(); + +}
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java Fri Jul 10 09:47:58 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java Fri Jul 10 13:44:01 2020 +0200 @@ -28,6 +28,7 @@ import com.passus.net.source.pcap.FrameDecoderImpl; import com.passus.st.client.DataEvents.DataEnd; import com.passus.st.client.DataEvents.DataLoopEnd; +import com.passus.st.client.Event; import com.passus.st.client.EventHandler; import com.passus.st.metric.MetricSource; import com.passus.st.metric.MetricsContainer; @@ -61,6 +62,8 @@ public static final int LOOP_INFINITE = 0; + public static final long DEFAULT_ORDER_QUEUE_TIMEOUT = 1_000; + private String name; private String pcapFile; @@ -87,6 +90,8 @@ private boolean skipResponses; + private long orderQueueTimeout = DEFAULT_ORDER_QUEUE_TIMEOUT; + public PcapSessionEventSource() { this.name = UniqueIdGenerator.generate(); @@ -178,6 +183,15 @@ this.loopDelay = loopDelay; } + public long getOrderQueueTimeout() { + return orderQueueTimeout; + } + + public void setOrderQueueTimeout(long orderQueueTimeout) { + Assert.greaterOrEqualZero(orderQueueTimeout, "orderQueueTimeout"); + this.orderQueueTimeout = orderQueueTimeout; + } + public void addAnalyzer(SessionAnalyzer analyzer) { Assert.notNull(analyzer, "analyzer"); analyzers.add(analyzer); @@ -219,6 +233,7 @@ loops = config.getInteger("loops", EventSource.DEFAULT_LOOPS); loopDelay = config.getLong("loopDelay", 0L); skipResponses = config.getBoolean("skipResponses", false); + orderQueueTimeout = config.getLong("orderQueueTimeout", DEFAULT_ORDER_QUEUE_TIMEOUT); setCollectMetrics(config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS)); @@ -267,7 +282,24 @@ } try { - SessionPacketHandler sessionPacketHandler = new SessionPacketHandler(allowPartialSession); + FlushableEventHandler eventHandler; + if (orderQueueTimeout == 0) { + eventHandler = new FlushableEventHandler() { + @Override + public void flush() { + + } + + @Override + public void handle(Event event) { + handler.handle(event); + } + }; + } else { + eventHandler = new TimeWindowEventHandler(handler, orderQueueTimeout); + } + + SessionPacketHandler sessionPacketHandler = new SessionPacketHandler(allowPartialSession, eventHandler); pcapThread = new PcapThread(pcapFile, sessionPacketHandler, loops); pcapThread.start(); started = true; @@ -473,11 +505,12 @@ private final CustomTimeGenerator timeGenerator = new CustomTimeGenerator(); - private final TimeWindowEventHandler eventHandler = new TimeWindowEventHandler(handler); + private final FlushableEventHandler eventHandler; private final PcapSessionAnalyzerHookContext hookContext; - private SessionPacketHandler(boolean allowPartialSession) { + private SessionPacketHandler(boolean allowPartialSession, FlushableEventHandler enventHandler) { + this.eventHandler = enventHandler; tcpProc = new TcpSessionProcessor(); tcpProc.setTimeGenerator(timeGenerator); tcpProc.setAllowPartialSession(allowPartialSession); @@ -576,6 +609,7 @@ tupleDef("fileName", FILE_STR_DEF), tupleDef("loops", INT_GREATER_EQUAL_ZERO_DEF).setRequired(false), tupleDef("loopDelay", LONG_GREATER_EQUAL_ZERO_DEF).setRequired(false), + tupleDef("orderQueueTimeout", LONG_GREATER_EQUAL_ZERO_DEF).setRequired(false), tupleDef("sessionProc", sessionProcDef).setRequired(false), tupleDef("collectMetrics", BOOLEAN_DEF).setRequired(false), tupleDef("skipResponses", BOOLEAN_DEF).setRequired(false),
--- a/stress-tester/src/main/java/com/passus/st/source/TimeWindowEventHandler.java Fri Jul 10 09:47:58 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/TimeWindowEventHandler.java Fri Jul 10 13:44:01 2020 +0200 @@ -5,17 +5,24 @@ import com.passus.st.client.EventHandler; import java.util.ArrayDeque; -public class TimeWindowEventHandler implements EventHandler { +public class TimeWindowEventHandler implements FlushableEventHandler { private final OrderedSlidingTimeWindow<Event> timeWindow = new OrderedSlidingTimeWindow(); + private final EventHandler handler; + private final long expires; + private final long flushPeriod; private long nextFlushTime = -1; public TimeWindowEventHandler(EventHandler handler) { - this(handler, 2_000, 500); + this(handler, 500, 100); + } + + public TimeWindowEventHandler(EventHandler handler, long timeout) { + this(handler, timeout, 100); } public TimeWindowEventHandler(EventHandler handler, long expires, long flushPeriod) { @@ -24,7 +31,7 @@ this.flushPeriod = flushPeriod; } - void flush() { + public void flush() { flush(Long.MAX_VALUE); }