changeset 1156:d9dbc633012c

FlowExecutor.sleepFactor enabled
author Devel 2
date Mon, 15 Jun 2020 08:46:19 +0200
parents 3eefcb0f98cf
children cbc27572188d
files stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java
diffstat 3 files changed, 50 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java	Mon Jun 15 08:07:16 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java	Mon Jun 15 08:46:19 2020 +0200
@@ -35,6 +35,8 @@
 
     private static final boolean DEFAULT_COLLECT_METRICS = false;
 
+    public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f;
+
     private static final Logger LOGGER = LogManager.getLogger(FlowExecutor.class);
 
     private Emitter emitter;
@@ -49,7 +51,7 @@
 
     private volatile boolean started = false;
 
-    //private float sleepFactor = FlowWorker.SLEEP_FACTOR_NO_SLEEP;
+    private float sleepFactor = SLEEP_FACTOR_NO_SLEEP;
 
     private String workerType = "synch";
 
@@ -115,13 +117,13 @@
         this.workersNum = workersNum;
     }
 
-/*    public float getSleepFactor() {
+    public float getSleepFactor() {
         return sleepFactor;
     }
 
     public void setSleepFactor(float sleepFactor) {
         this.sleepFactor = sleepFactor;
-    }*/
+    }
 
     public FlowWorkerDispatcher getDispatcher() {
         return dispatcher;
@@ -193,6 +195,7 @@
         connectPartialSession = config.getBoolean("connectPartialSession", DEFAULT_CONNECT_PARTIAL_SESSION);
         collectMetrics = config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS);
         dispatcher = config.get("dispatcher", null);
+        sleepFactor = config.getFloat("sleepFactor", SLEEP_FACTOR_NO_SLEEP);
     }
 
     @Override
@@ -253,9 +256,11 @@
             worker.setListener(listener);
             worker.setFilterChain(filterChain.instanceForWorker(i));
 
-//            if (worker instanceof HttpFlowBasedClientWorker) {
-//                ((HttpFlowBasedClientWorker) worker).setSleepFactor(sleepFactor);
-//            }
+            if (worker instanceof SynchFlowWorker) {
+                ((SynchFlowWorker) worker).setSleepFactor(sleepFactor);
+            } else if (worker instanceof ParallelFlowWorker) {
+                ((ParallelFlowWorker) worker).setSleepFactor(sleepFactor);
+            }
 
             worker.setCollectMetrics(collectMetrics);
             worker.setConnectPartialSession(connectPartialSession);
@@ -316,7 +321,6 @@
                 LOGGER.debug(e.getMessage(), e);
             }
         }
-
     }
 
     private static class DispatcherTransformer implements ValueTransformer {
@@ -362,6 +366,7 @@
                     tupleDef("connectPartialSession", BOOLEAN_DEF).setRequired(false),
                     tupleDef("collectMetrics", BOOLEAN_DEF).setRequired(false),
                     tupleDef("workers", INT_GREATER_THAN_ZERO_DEF).setRequired(false),
+                    tupleDef("sleepFactor", valueDefFloat()).setRequired(false),
                     tupleDef("dispatcher", valueDef()
                             .addValidator(new EnumValidator(DISPATCHERS, false))
                             .setTransformer(FlowExecutor.DispatcherTransformer.INSTANCE)
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Mon Jun 15 08:07:16 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Mon Jun 15 08:46:19 2020 +0200
@@ -17,6 +17,7 @@
 import java.util.concurrent.LinkedBlockingDeque;
 
 import static com.passus.st.client.FlowContext.STATE_CONNECTED;
+import static com.passus.st.client.FlowExecutor.SLEEP_FACTOR_NO_SLEEP;
 import static com.passus.st.client.FlowUtils.*;
 
 @Plugin(name = ParallelFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER)
@@ -39,6 +40,8 @@
 
     private volatile boolean working;
 
+    private float sleepFactor = SLEEP_FACTOR_NO_SLEEP;
+
     public ParallelFlowWorker(Emitter emitter, String name, int index) {
         super(emitter, name, index);
     }
@@ -52,7 +55,6 @@
         this.maxSentRequests = maxSentRequests;
     }
 
-
     public long getEventsQueueWaitTime() {
         return eventsQueueWaitTime;
     }
@@ -62,6 +64,14 @@
         this.eventsQueueWaitTime = eventsQueueWaitTime;
     }
 
+    public float getSleepFactor() {
+        return sleepFactor;
+    }
+
+    public void setSleepFactor(float sleepFactor) {
+        this.sleepFactor = sleepFactor;
+    }
+
     @Override
     public boolean isWorking() {
         return working;
@@ -118,7 +128,7 @@
         flowHandler.init(flowContext);
         flowContext.client(flowHandler);
 
-        FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue);
+        FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue, sleepFactor);
         flowThread.start();
         sessions.put(session, flowThread);
         return flowThread;
@@ -253,13 +263,18 @@
 
         private final FlowProcessor flowProcessor;
 
+        private final float sleepFactor;
+
+        private long lastEventTimestamp = -1;
+
         private final BlockingQueue<Event> queue;
 
-        private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize) {
+        private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize, float sleepFactor) {
             this.emitter = emitter;
             this.flowContext = flowContext;
             flowProcessor = new FlowProcessor(this, logger, index);
             this.queue = new ArrayBlockingQueue<>(queueSize);
+            this.sleepFactor = sleepFactor;
         }
 
         public FlowContext flowContext() {
@@ -357,6 +372,16 @@
 
         }
 
+        private void sleep(Event event) {
+            if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) {
+                if (lastEventTimestamp != -1) {
+                    long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor);
+                    sleepSilently(timeToSleep);
+                }
+                lastEventTimestamp = event.getTimestamp();
+            }
+        }
+
         @Override
         public void run() {
             working = true;
@@ -373,6 +398,7 @@
                 }
 
                 if (event != null) {
+                    sleep(event);
                     if (trace) {
                         trace(logger, flowContext, "Event processing: {}", event);
                     }
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Mon Jun 15 08:07:16 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Mon Jun 15 08:46:19 2020 +0200
@@ -18,6 +18,7 @@
 
 import static com.passus.st.client.FlowContext.STATE_CONNECTED;
 import static com.passus.st.client.FlowContext.STATE_DISCONNECTED;
+import static com.passus.st.client.FlowExecutor.SLEEP_FACTOR_NO_SLEEP;
 import static com.passus.st.client.FlowUtils.*;
 
 @Plugin(name = SynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER)
@@ -25,8 +26,6 @@
 
     protected final Logger logger = LogManager.getLogger(getClass());
 
-    public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f;
-
     public static final String TYPE = "synch";
 
     private volatile boolean working = false;
@@ -65,6 +64,14 @@
         this.eventsQueueWaitTime = eventsQueueWaitTime;
     }
 
+    public float getSleepFactor() {
+        return sleepFactor;
+    }
+
+    public void setSleepFactor(float sleepFactor) {
+        this.sleepFactor = sleepFactor;
+    }
+
     @Override
     public int activeConnections() {
         int count = 0;