changeset 1287:c04b5c168a9c

PcapSessionEventSource.orderQueueTimeout
author Devel 2
date Fri, 10 Jul 2020 13:44:01 +0200
parents 356c10f8b486
children ded8813add01
files stress-tester/src/main/java/com/passus/st/source/FlushableEventHandler.java stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java stress-tester/src/main/java/com/passus/st/source/TimeWindowEventHandler.java
diffstat 3 files changed, 56 insertions(+), 6 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/source/FlushableEventHandler.java	Fri Jul 10 13:44:01 2020 +0200
@@ -0,0 +1,9 @@
+package com.passus.st.source;
+
+import com.passus.st.client.EventHandler;
+
+public interface FlushableEventHandler extends EventHandler {
+
+    public void flush();
+
+}
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java	Fri Jul 10 09:47:58 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java	Fri Jul 10 13:44:01 2020 +0200
@@ -28,6 +28,7 @@
 import com.passus.net.source.pcap.FrameDecoderImpl;
 import com.passus.st.client.DataEvents.DataEnd;
 import com.passus.st.client.DataEvents.DataLoopEnd;
+import com.passus.st.client.Event;
 import com.passus.st.client.EventHandler;
 import com.passus.st.metric.MetricSource;
 import com.passus.st.metric.MetricsContainer;
@@ -61,6 +62,8 @@
 
     public static final int LOOP_INFINITE = 0;
 
+    public static final long DEFAULT_ORDER_QUEUE_TIMEOUT = 1_000;
+
     private String name;
 
     private String pcapFile;
@@ -87,6 +90,8 @@
 
     private boolean skipResponses;
 
+    private long orderQueueTimeout = DEFAULT_ORDER_QUEUE_TIMEOUT;
+
     public PcapSessionEventSource() {
         this.name = UniqueIdGenerator.generate();
 
@@ -178,6 +183,15 @@
         this.loopDelay = loopDelay;
     }
 
+    public long getOrderQueueTimeout() {
+        return orderQueueTimeout;
+    }
+
+    public void setOrderQueueTimeout(long orderQueueTimeout) {
+        Assert.greaterOrEqualZero(orderQueueTimeout, "orderQueueTimeout");
+        this.orderQueueTimeout = orderQueueTimeout;
+    }
+
     public void addAnalyzer(SessionAnalyzer analyzer) {
         Assert.notNull(analyzer, "analyzer");
         analyzers.add(analyzer);
@@ -219,6 +233,7 @@
         loops = config.getInteger("loops", EventSource.DEFAULT_LOOPS);
         loopDelay = config.getLong("loopDelay", 0L);
         skipResponses = config.getBoolean("skipResponses", false);
+        orderQueueTimeout = config.getLong("orderQueueTimeout", DEFAULT_ORDER_QUEUE_TIMEOUT);
 
         setCollectMetrics(config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS));
 
@@ -267,7 +282,24 @@
         }
 
         try {
-            SessionPacketHandler sessionPacketHandler = new SessionPacketHandler(allowPartialSession);
+            FlushableEventHandler eventHandler;
+            if (orderQueueTimeout == 0) {
+                eventHandler = new FlushableEventHandler() {
+                    @Override
+                    public void flush() {
+
+                    }
+
+                    @Override
+                    public void handle(Event event) {
+                        handler.handle(event);
+                    }
+                };
+            } else {
+                eventHandler = new TimeWindowEventHandler(handler, orderQueueTimeout);
+            }
+
+            SessionPacketHandler sessionPacketHandler = new SessionPacketHandler(allowPartialSession, eventHandler);
             pcapThread = new PcapThread(pcapFile, sessionPacketHandler, loops);
             pcapThread.start();
             started = true;
@@ -473,11 +505,12 @@
 
         private final CustomTimeGenerator timeGenerator = new CustomTimeGenerator();
 
-        private final TimeWindowEventHandler eventHandler = new TimeWindowEventHandler(handler);
+        private final FlushableEventHandler eventHandler;
 
         private final PcapSessionAnalyzerHookContext hookContext;
 
-        private SessionPacketHandler(boolean allowPartialSession) {
+        private SessionPacketHandler(boolean allowPartialSession, FlushableEventHandler enventHandler) {
+            this.eventHandler = enventHandler;
             tcpProc = new TcpSessionProcessor();
             tcpProc.setTimeGenerator(timeGenerator);
             tcpProc.setAllowPartialSession(allowPartialSession);
@@ -576,6 +609,7 @@
                     tupleDef("fileName", FILE_STR_DEF),
                     tupleDef("loops", INT_GREATER_EQUAL_ZERO_DEF).setRequired(false),
                     tupleDef("loopDelay", LONG_GREATER_EQUAL_ZERO_DEF).setRequired(false),
+                    tupleDef("orderQueueTimeout", LONG_GREATER_EQUAL_ZERO_DEF).setRequired(false),
                     tupleDef("sessionProc", sessionProcDef).setRequired(false),
                     tupleDef("collectMetrics", BOOLEAN_DEF).setRequired(false),
                     tupleDef("skipResponses", BOOLEAN_DEF).setRequired(false),
--- a/stress-tester/src/main/java/com/passus/st/source/TimeWindowEventHandler.java	Fri Jul 10 09:47:58 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/TimeWindowEventHandler.java	Fri Jul 10 13:44:01 2020 +0200
@@ -5,17 +5,24 @@
 import com.passus.st.client.EventHandler;
 import java.util.ArrayDeque;
 
-public class TimeWindowEventHandler implements EventHandler {
+public class TimeWindowEventHandler implements FlushableEventHandler {
 
     private final OrderedSlidingTimeWindow<Event> timeWindow = new OrderedSlidingTimeWindow();
+
     private final EventHandler handler;
+
     private final long expires;
+
     private final long flushPeriod;
 
     private long nextFlushTime = -1;
 
     public TimeWindowEventHandler(EventHandler handler) {
-        this(handler, 2_000, 500);
+        this(handler, 500, 100);
+    }
+
+    public TimeWindowEventHandler(EventHandler handler, long timeout) {
+        this(handler, timeout, 100);
     }
 
     public TimeWindowEventHandler(EventHandler handler, long expires, long flushPeriod) {
@@ -24,7 +31,7 @@
         this.flushPeriod = flushPeriod;
     }
 
-    void flush() {
+    public void flush() {
         flush(Long.MAX_VALUE);
     }