changeset 1238:457d050a95f6

FlowExecutor.maxSleepTime, PcapSessionEventSource.skipResponses
author Devel 2
date Tue, 30 Jun 2020 12:02:17 +0200
parents e8d302936240
children ce9ac16deb77
files stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/main/java/com/passus/st/config/CommonNodeDefs.java stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java stress-tester/src/main/java/com/passus/st/source/PcapHttpSessionAnalyzerHook.java stress-tester/src/main/java/com/passus/st/source/PcapSessionAnalyzerHook.java stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java
diffstat 8 files changed, 101 insertions(+), 19 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java	Tue Jun 30 10:56:04 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java	Tue Jun 30 12:02:17 2020 +0200
@@ -24,8 +24,7 @@
 import java.util.*;
 
 import static com.passus.config.schema.ConfigurationSchemaBuilder.*;
-import static com.passus.st.config.CommonNodeDefs.BOOLEAN_DEF;
-import static com.passus.st.config.CommonNodeDefs.INT_GREATER_THAN_ZERO_DEF;
+import static com.passus.st.config.CommonNodeDefs.*;
 
 public class FlowExecutor implements EventHandler, MetricSource, Service, Configurable {
 
@@ -36,6 +35,7 @@
     private static final boolean DEFAULT_COLLECT_METRICS = false;
 
     public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f;
+    public static final long MAX_SLEEP_TIME = 0;
 
     private static final Logger LOGGER = LogManager.getLogger(FlowExecutor.class);
 
@@ -53,6 +53,8 @@
 
     private float sleepFactor = SLEEP_FACTOR_NO_SLEEP;
 
+    private long maxSleepTime = MAX_SLEEP_TIME;
+
     private String workerType = "synch";
 
     private int workersNum = DEFAULT_WORKERS_NUM;
@@ -125,6 +127,14 @@
         this.sleepFactor = sleepFactor;
     }
 
+    public long getMaxSleepTime() {
+        return maxSleepTime;
+    }
+
+    public void setMaxSleepTime(long maxSleepTime) {
+        this.maxSleepTime = maxSleepTime;
+    }
+
     public FlowWorkerDispatcher getDispatcher() {
         return dispatcher;
     }
@@ -196,6 +206,7 @@
         collectMetrics = config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS);
         dispatcher = config.get("dispatcher", null);
         sleepFactor = config.getFloat("sleepFactor", SLEEP_FACTOR_NO_SLEEP);
+        maxSleepTime = config.getLong("maxSleepTime", MAX_SLEEP_TIME);
     }
 
     @Override
@@ -257,9 +268,13 @@
             worker.setFilterChain(filterChain.instanceForWorker(i));
 
             if (worker instanceof SynchFlowWorker) {
-                ((SynchFlowWorker) worker).setSleepFactor(sleepFactor);
+                SynchFlowWorker synchWorker = (SynchFlowWorker) worker;
+                synchWorker.setSleepFactor(sleepFactor);
+                synchWorker.setMaxSleepTime(maxSleepTime);
             } else if (worker instanceof ParallelFlowWorker) {
-                ((ParallelFlowWorker) worker).setSleepFactor(sleepFactor);
+                ParallelFlowWorker parallelWorker = (ParallelFlowWorker) worker;
+                parallelWorker.setSleepFactor(sleepFactor);
+                parallelWorker.setMaxSleepTime(maxSleepTime);
             }
 
             worker.setCollectMetrics(collectMetrics);
@@ -367,6 +382,7 @@
                     tupleDef("collectMetrics", BOOLEAN_DEF).setRequired(false),
                     tupleDef("workers", INT_GREATER_THAN_ZERO_DEF).setRequired(false),
                     tupleDef("sleepFactor", valueDefFloat()).setRequired(false),
+                    tupleDef("maxSleepTime", LONG_DEF).setRequired(false),
                     tupleDef("dispatcher", valueDef()
                             .addValidator(new EnumValidator(DISPATCHERS, false))
                             .setTransformer(FlowExecutor.DispatcherTransformer.INSTANCE)
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Tue Jun 30 10:56:04 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Tue Jun 30 12:02:17 2020 +0200
@@ -17,6 +17,7 @@
 import java.util.concurrent.LinkedBlockingDeque;
 
 import static com.passus.st.client.FlowContext.STATE_CONNECTED;
+import static com.passus.st.client.FlowExecutor.MAX_SLEEP_TIME;
 import static com.passus.st.client.FlowExecutor.SLEEP_FACTOR_NO_SLEEP;
 import static com.passus.st.client.FlowUtils.*;
 
@@ -42,6 +43,8 @@
 
     private float sleepFactor = SLEEP_FACTOR_NO_SLEEP;
 
+    private long maxSleepTime = MAX_SLEEP_TIME;
+
     public ParallelFlowWorker(Emitter emitter, String name, int index) {
         super(emitter, name, index);
     }
@@ -72,6 +75,14 @@
         this.sleepFactor = sleepFactor;
     }
 
+    public long getMaxSleepTime() {
+        return maxSleepTime;
+    }
+
+    public void setMaxSleepTime(long maxSleepTime) {
+        this.maxSleepTime = maxSleepTime;
+    }
+
     @Override
     public boolean isWorking() {
         return working;
@@ -128,7 +139,7 @@
         flowHandler.init(flowContext);
         flowContext.client(flowHandler);
 
-        FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue, sleepFactor);
+        FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue, sleepFactor, maxSleepTime);
         flowThread.start();
         sessions.put(session, flowThread);
         return flowThread;
@@ -265,16 +276,19 @@
 
         private final float sleepFactor;
 
+        private final long maxSleepTime;
+
         private long lastEventTimestamp = -1;
 
         private final BlockingQueue<Event> queue;
 
-        private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize, float sleepFactor) {
+        private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize, float sleepFactor, long maxSleepTime) {
             this.emitter = emitter;
             this.flowContext = flowContext;
             flowProcessor = new FlowProcessor(this, logger, index);
             this.queue = new ArrayBlockingQueue<>(queueSize);
             this.sleepFactor = sleepFactor;
+            this.maxSleepTime = maxSleepTime;
         }
 
         public FlowContext flowContext() {
@@ -376,6 +390,10 @@
             if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) {
                 if (lastEventTimestamp != -1) {
                     long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor);
+                    if (maxSleepTime > 0 && timeToSleep > maxSleepTime) {
+                        timeToSleep = maxSleepTime;
+                    }
+
                     sleepSilently(timeToSleep);
                 }
                 lastEventTimestamp = event.getTimestamp();
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Tue Jun 30 10:56:04 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Tue Jun 30 12:02:17 2020 +0200
@@ -17,6 +17,7 @@
 import java.util.concurrent.LinkedBlockingDeque;
 
 import static com.passus.st.client.FlowContext.*;
+import static com.passus.st.client.FlowExecutor.MAX_SLEEP_TIME;
 import static com.passus.st.client.FlowExecutor.SLEEP_FACTOR_NO_SLEEP;
 import static com.passus.st.client.FlowUtils.*;
 
@@ -47,6 +48,8 @@
 
     private long lastEventTimestamp = -1;
 
+    private long maxSleepTime = MAX_SLEEP_TIME;
+
     public SynchFlowWorker(Emitter emitter, String name, int index) {
         super(emitter, name, index);
     }
@@ -73,6 +76,14 @@
         this.sleepFactor = sleepFactor;
     }
 
+    public long getMaxSleepTime() {
+        return maxSleepTime;
+    }
+
+    public void setMaxSleepTime(long maxSleepTime) {
+        this.maxSleepTime = maxSleepTime;
+    }
+
     @Override
     public int activeConnections() {
         int count = 0;
@@ -259,6 +270,10 @@
         if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) {
             if (lastEventTimestamp != -1) {
                 long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor);
+                if (maxSleepTime > 0 && timeToSleep > maxSleepTime) {
+                    timeToSleep = maxSleepTime;
+                }
+
                 sleepSilently(timeToSleep);
             }
             lastEventTimestamp = event.getTimestamp();
--- a/stress-tester/src/main/java/com/passus/st/config/CommonNodeDefs.java	Tue Jun 30 10:56:04 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/config/CommonNodeDefs.java	Tue Jun 30 12:02:17 2020 +0200
@@ -20,6 +20,8 @@
 
     public static NodeDefinition STRING_LIST_DEF = listDef(STRING_DEF);
 
+    public static NodeDefinition LONG_DEF =  valueDefLong();
+
     public static NodeDefinition PORT_DEF = valueDefInteger().addValidator(PortValidator.INSTANCE);
 
     public static NodeDefinition INT_GREATER_THAN_ZERO_DEF = valueDefInteger().addValidator(LongValidator.GREATER_ZERO);
--- a/stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java	Tue Jun 30 10:56:04 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java	Tue Jun 30 12:02:17 2020 +0200
@@ -15,37 +15,49 @@
 
     private final Map<SessionInfo, HttpRequest> lastRequests;
 
+    private final boolean skipResponses;
+
     public PcapHttpListener(String sourceName, EventHandler eventHandler,
-                            boolean collectMetric, PcapSessionEventSourceMetric metric, int maxSessionNum) {
+                            boolean collectMetric, PcapSessionEventSourceMetric metric, int maxSessionNum,
+                            boolean skipResponses) {
         super(sourceName, eventHandler, collectMetric, metric, HTTP);
-        lastRequests = new HashMap<>(maxSessionNum);
+        this.skipResponses = skipResponses;
+        this.lastRequests = skipResponses ? null : new HashMap<>(maxSessionNum);
     }
 
     @Override
     public void onMessageReceived(SessionContext context, HttpMessage msg, long timestamp) {
         SessionInfo session = SessionInfo.create(context, protocolId);
-        if (msg.isRequest()) {
-            HttpRequest lastRequest = lastRequests.put(session, (HttpRequest) msg);
-            if (lastRequest != null) {
-                firePayloadEvent(lastRequest, null, session, timestamp);
+        if (skipResponses) {
+            if (msg.isRequest()) {
+                firePayloadEvent(msg, null, session, timestamp);
             }
         } else {
-            HttpRequest req = lastRequests.remove(session);
-            firePayloadEvent(req, msg, session, timestamp);
+            if (msg.isRequest()) {
+                HttpRequest lastRequest = lastRequests.put(session, (HttpRequest) msg);
+                if (lastRequest != null) {
+                    firePayloadEvent(lastRequest, null, session, timestamp);
+                }
+            } else {
+                HttpRequest req = lastRequests.remove(session);
+                firePayloadEvent(req, msg, session, timestamp);
+            }
         }
     }
 
     @Override
     protected void onSessionClosed(SessionContext context, int status, long timestamp) {
         SessionInfo session = SessionInfo.create(context, protocolId);
-        HttpRequest req = lastRequests.remove(session);
-        if (req != null) {
-            firePayloadEvent(req, null, session, timestamp);
+        if (!skipResponses) {
+            HttpRequest req = lastRequests.remove(session);
+            if (req != null) {
+                firePayloadEvent(req, null, session, timestamp);
+            }
         }
     }
 
     public void onLoopEnd(long timestamp) {
-        if (!lastRequests.isEmpty()) {
+        if (!skipResponses && !lastRequests.isEmpty()) {
             lastRequests.forEach((session, req) -> {
                 firePayloadEvent(req, null, session, timestamp);
             });
--- a/stress-tester/src/main/java/com/passus/st/source/PcapHttpSessionAnalyzerHook.java	Tue Jun 30 10:56:04 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapHttpSessionAnalyzerHook.java	Tue Jun 30 12:02:17 2020 +0200
@@ -18,7 +18,8 @@
                 context.getEventHandler(),
                 context.isCollectMetric(),
                 context.getMetric(),
-                context.getTcpProcessor().getMaxSessionNumber());
+                context.getTcpProcessor().getMaxSessionNumber(),
+                skipResponses);
 
         analyzer.setListener(listener);
         tcpProc.addAnalyzer(analyzer);
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionAnalyzerHook.java	Tue Jun 30 10:56:04 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionAnalyzerHook.java	Tue Jun 30 12:02:17 2020 +0200
@@ -5,6 +5,16 @@
 
 public abstract class PcapSessionAnalyzerHook {
 
+    protected boolean skipResponses;
+
+    public boolean isSkipResponses() {
+        return skipResponses;
+    }
+
+    public void setSkipResponses(boolean skipResponses) {
+        this.skipResponses = skipResponses;
+    }
+
     public boolean supports(SessionAnalyzer obj) {
         Assert.notNull(obj, "object");
         return supports(obj.getClass());
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java	Tue Jun 30 10:56:04 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java	Tue Jun 30 12:02:17 2020 +0200
@@ -85,6 +85,8 @@
 
     private List<SessionAnalyzer> analyzers = new ArrayList<>();
 
+    private boolean skipResponses;
+
     public PcapSessionEventSource() {
         this.name = UniqueIdGenerator.generate();
 
@@ -216,6 +218,7 @@
         pcapFile = config.getString("fileName");
         loops = config.getInteger("loops", EventSource.DEFAULT_LOOPS);
         loopDelay = config.getLong("loopDelay", 0L);
+        skipResponses = config.getBoolean("skipResponses", false);
 
         setCollectMetrics(config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS));
 
@@ -253,6 +256,10 @@
             return;
         }
 
+        hooks.forEach((h) -> {
+            h.setSkipResponses(skipResponses);
+        });
+
         if (handler == null) {
             throw new IllegalStateException("Handler cannot be null.");
         } else if (pcapFile == null) {
@@ -571,6 +578,7 @@
                     tupleDef("loopDelay", LONG_GREATER_EQUAL_ZERO_DEF).setRequired(false),
                     tupleDef("sessionProc", sessionProcDef).setRequired(false),
                     tupleDef("collectMetrics", BOOLEAN_DEF).setRequired(false),
+                    tupleDef("skipResponses", BOOLEAN_DEF).setRequired(false),
                     tupleDef("analyzers",
                             dynaKeyValueVaryListDef("type", pf).setTransformToPluginObject(true)
                     ).setRequired(false)