changeset 1240:f082bcb89613

FlowExecutor.disconnectAllOnDataLoop
author Devel 2
date Tue, 30 Jun 2020 15:52:22 +0200
parents ce9ac16deb77
children f1a454e020e5
files stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java
diffstat 3 files changed, 72 insertions(+), 19 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java	Tue Jun 30 12:11:50 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java	Tue Jun 30 15:52:22 2020 +0200
@@ -34,8 +34,11 @@
 
     private static final boolean DEFAULT_COLLECT_METRICS = false;
 
-    public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f;
-    public static final long MAX_SLEEP_TIME = 0;
+    private static final boolean DEFAULT_DISCONNECT_ALL_ON_DATA_LOOP = true;
+
+    public static final float DEFAULT_SLEEP_FACTOR_NO_SLEEP = 0.0f;
+
+    public static final long DEFAULT_MAX_SLEEP_TIME = 0;
 
     private static final Logger LOGGER = LogManager.getLogger(FlowExecutor.class);
 
@@ -51,9 +54,9 @@
 
     private volatile boolean started = false;
 
-    private float sleepFactor = SLEEP_FACTOR_NO_SLEEP;
+    private float sleepFactor = DEFAULT_SLEEP_FACTOR_NO_SLEEP;
 
-    private long maxSleepTime = MAX_SLEEP_TIME;
+    private long maxSleepTime = DEFAULT_MAX_SLEEP_TIME;
 
     private String workerType = "synch";
 
@@ -63,6 +66,8 @@
 
     private boolean connectPartialSession = DEFAULT_CONNECT_PARTIAL_SESSION;
 
+    private boolean disconnectAllOnDataLoop = DEFAULT_DISCONNECT_ALL_ON_DATA_LOOP;
+
     private FlowWorkerDispatcher dispatcher;
 
     public Emitter getEmitter() {
@@ -135,6 +140,14 @@
         this.maxSleepTime = maxSleepTime;
     }
 
+    public boolean isDisconnectAllOnDataLoop() {
+        return disconnectAllOnDataLoop;
+    }
+
+    public void setDisconnectAllOnDataLoop(boolean disconnectAllOnDataLoop) {
+        this.disconnectAllOnDataLoop = disconnectAllOnDataLoop;
+    }
+
     public FlowWorkerDispatcher getDispatcher() {
         return dispatcher;
     }
@@ -204,9 +217,10 @@
         setWorkersNum(config.getInteger("workers", DEFAULT_WORKERS_NUM));
         connectPartialSession = config.getBoolean("connectPartialSession", DEFAULT_CONNECT_PARTIAL_SESSION);
         collectMetrics = config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS);
+        disconnectAllOnDataLoop = config.getBoolean("disconnectAllOnDataLoop", DEFAULT_DISCONNECT_ALL_ON_DATA_LOOP);
         dispatcher = config.get("dispatcher", null);
-        sleepFactor = config.getFloat("sleepFactor", SLEEP_FACTOR_NO_SLEEP);
-        maxSleepTime = config.getLong("maxSleepTime", MAX_SLEEP_TIME);
+        sleepFactor = config.getFloat("sleepFactor", DEFAULT_SLEEP_FACTOR_NO_SLEEP);
+        maxSleepTime = config.getLong("maxSleepTime", DEFAULT_MAX_SLEEP_TIME);
     }
 
     @Override
@@ -271,10 +285,12 @@
                 SynchFlowWorker synchWorker = (SynchFlowWorker) worker;
                 synchWorker.setSleepFactor(sleepFactor);
                 synchWorker.setMaxSleepTime(maxSleepTime);
+                synchWorker.setDisconnectAllOnDataLoop(disconnectAllOnDataLoop);
             } else if (worker instanceof ParallelFlowWorker) {
                 ParallelFlowWorker parallelWorker = (ParallelFlowWorker) worker;
                 parallelWorker.setSleepFactor(sleepFactor);
                 parallelWorker.setMaxSleepTime(maxSleepTime);
+                parallelWorker.setDisconnectAllOnDataLoop(disconnectAllOnDataLoop);
             }
 
             worker.setCollectMetrics(collectMetrics);
@@ -380,6 +396,7 @@
             return mapDef(
                     tupleDef("connectPartialSession", BOOLEAN_DEF).setRequired(false),
                     tupleDef("collectMetrics", BOOLEAN_DEF).setRequired(false),
+                    tupleDef("disconnectAllOnDataLoop", BOOLEAN_DEF).setRequired(false),
                     tupleDef("workers", INT_GREATER_THAN_ZERO_DEF).setRequired(false),
                     tupleDef("sleepFactor", valueDefFloat()).setRequired(false),
                     tupleDef("maxSleepTime", LONG_DEF).setRequired(false),
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Tue Jun 30 12:11:50 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Tue Jun 30 15:52:22 2020 +0200
@@ -17,8 +17,8 @@
 import java.util.concurrent.LinkedBlockingDeque;
 
 import static com.passus.st.client.FlowContext.STATE_CONNECTED;
-import static com.passus.st.client.FlowExecutor.MAX_SLEEP_TIME;
-import static com.passus.st.client.FlowExecutor.SLEEP_FACTOR_NO_SLEEP;
+import static com.passus.st.client.FlowExecutor.DEFAULT_MAX_SLEEP_TIME;
+import static com.passus.st.client.FlowExecutor.DEFAULT_SLEEP_FACTOR_NO_SLEEP;
 import static com.passus.st.client.FlowUtils.*;
 
 @Plugin(name = ParallelFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER)
@@ -41,9 +41,11 @@
 
     private volatile boolean working;
 
-    private float sleepFactor = SLEEP_FACTOR_NO_SLEEP;
+    private float sleepFactor = DEFAULT_SLEEP_FACTOR_NO_SLEEP;
 
-    private long maxSleepTime = MAX_SLEEP_TIME;
+    private long maxSleepTime = DEFAULT_MAX_SLEEP_TIME;
+
+    private boolean disconnectAllOnDataLoop = true;
 
     public ParallelFlowWorker(Emitter emitter, String name, int index) {
         super(emitter, name, index);
@@ -83,6 +85,14 @@
         this.maxSleepTime = maxSleepTime;
     }
 
+    public boolean isDisconnectAllOnDataLoop() {
+        return disconnectAllOnDataLoop;
+    }
+
+    public void setDisconnectAllOnDataLoop(boolean disconnectAllOnDataLoop) {
+        this.disconnectAllOnDataLoop = disconnectAllOnDataLoop;
+    }
+
     @Override
     public boolean isWorking() {
         return working;
@@ -139,7 +149,8 @@
         flowHandler.init(flowContext);
         flowContext.client(flowHandler);
 
-        FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue, sleepFactor, maxSleepTime);
+        FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue,
+                sleepFactor, maxSleepTime, disconnectAllOnDataLoop);
         flowThread.start();
         sessions.put(session, flowThread);
         return flowThread;
@@ -282,13 +293,16 @@
 
         private final BlockingQueue<Event> queue;
 
-        private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize, float sleepFactor, long maxSleepTime) {
+        private final boolean disconnectAllOnDataLoop;
+
+        private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize, float sleepFactor, long maxSleepTime, boolean disconnectAllOnDataLoop) {
             this.emitter = emitter;
             this.flowContext = flowContext;
             flowProcessor = new FlowProcessor(this, logger, index);
             this.queue = new ArrayBlockingQueue<>(queueSize);
             this.sleepFactor = sleepFactor;
             this.maxSleepTime = maxSleepTime;
+            this.disconnectAllOnDataLoop = disconnectAllOnDataLoop;
         }
 
         public FlowContext flowContext() {
@@ -387,7 +401,7 @@
         }
 
         private void sleep(Event event) {
-            if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) {
+            if (sleepFactor != DEFAULT_SLEEP_FACTOR_NO_SLEEP) {
                 if (lastEventTimestamp != -1) {
                     long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor);
                     if (maxSleepTime > 0 && timeToSleep > maxSleepTime) {
@@ -447,6 +461,14 @@
                             debug(logger, flowContext, "DataLoopEnd received.");
                         }
 
+                        if (disconnectAllOnDataLoop) {
+                            finish();
+                        }
+                    } else if (event.getType() == DataEvents.DataEnd.TYPE) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("DataEnd received. Deactivation.");
+                        }
+
                         finish();
                     }
                 }
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Tue Jun 30 12:11:50 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Tue Jun 30 15:52:22 2020 +0200
@@ -17,8 +17,8 @@
 import java.util.concurrent.LinkedBlockingDeque;
 
 import static com.passus.st.client.FlowContext.*;
-import static com.passus.st.client.FlowExecutor.MAX_SLEEP_TIME;
-import static com.passus.st.client.FlowExecutor.SLEEP_FACTOR_NO_SLEEP;
+import static com.passus.st.client.FlowExecutor.DEFAULT_MAX_SLEEP_TIME;
+import static com.passus.st.client.FlowExecutor.DEFAULT_SLEEP_FACTOR_NO_SLEEP;
 import static com.passus.st.client.FlowUtils.*;
 
 @Plugin(name = SynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER)
@@ -44,11 +44,13 @@
 
     private FlowProcessor flowProcessor;
 
-    private float sleepFactor = SLEEP_FACTOR_NO_SLEEP;
+    private float sleepFactor = DEFAULT_SLEEP_FACTOR_NO_SLEEP;
 
     private long lastEventTimestamp = -1;
 
-    private long maxSleepTime = MAX_SLEEP_TIME;
+    private long maxSleepTime = DEFAULT_MAX_SLEEP_TIME;
+
+    private boolean disconnectAllOnDataLoop = true;
 
     public SynchFlowWorker(Emitter emitter, String name, int index) {
         super(emitter, name, index);
@@ -84,6 +86,14 @@
         this.maxSleepTime = maxSleepTime;
     }
 
+    public boolean isDisconnectAllOnDataLoop() {
+        return disconnectAllOnDataLoop;
+    }
+
+    public void setDisconnectAllOnDataLoop(boolean disconnectAllOnDataLoop) {
+        this.disconnectAllOnDataLoop = disconnectAllOnDataLoop;
+    }
+
     @Override
     public int activeConnections() {
         int count = 0;
@@ -267,7 +277,7 @@
     }
 
     protected void sleep(Event event) {
-        if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) {
+        if (sleepFactor != DEFAULT_SLEEP_FACTOR_NO_SLEEP) {
             if (lastEventTimestamp != -1) {
                 long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor);
                 if (maxSleepTime > 0 && timeToSleep > maxSleepTime) {
@@ -327,13 +337,17 @@
                 logger.debug("DataLoopEnd received.");
             }
 
-            disconnectAll();
+            if (disconnectAllOnDataLoop) {
+                disconnectAll();
+            }
+
             filterChain.reset();
         } else if (event.getType() == DataEvents.DataEnd.TYPE) {
             if (logger.isDebugEnabled()) {
                 logger.debug("DataEnd received. Deactivation.");
             }
 
+            disconnectAll();
             eventsQueue.clear();
             working = false;
         }