changeset 1253:0a0631e1b289

ParallelFlowWorker bugfixes
author Devel 2
date Thu, 02 Jul 2020 09:22:29 +0200
parents aa5ba90755dd
children 9e0796bd75b1
files stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java
diffstat 1 files changed, 17 insertions(+), 7 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Thu Jul 02 08:37:25 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Thu Jul 02 09:22:29 2020 +0200
@@ -150,7 +150,7 @@
         flowContext.client(flowHandler);
 
         FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue,
-                sleepFactor, maxSleepTime, disconnectAllOnDataLoop);
+                sleepFactor, maxSleepTime, disconnectAllOnDataLoop, collectMetrics, metric);
         flowThread.start();
         sessions.put(session, flowThread);
         return flowThread;
@@ -200,6 +200,7 @@
             logger.trace("Event processing: {}", event);
         }
 
+
         if (event instanceof SessionEvent) {
             switch (event.getType()) {
                 case SessionStatusEvent.TYPE: {
@@ -221,10 +222,12 @@
                     SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event;
                     FlowThread flowThread = flowThread(payloadEvent);
 
-                    if (connectPartialSession) {
+                    if (flowThread == null && connectPartialSession) {
                         SessionInfo sessionInfo = payloadEvent.getSessionInfo();
                         flowThread = register(sessionInfo);
-                        flowThread.handle(SessionStatusEvent.establishedEvent(sessionInfo));
+                        if (flowThread != null) {
+                            flowThread.handle(SessionStatusEvent.establishedEvent(sessionInfo));
+                        }
                     }
 
                     if (flowThread != null) {
@@ -295,14 +298,21 @@
 
         private final boolean disconnectAllOnDataLoop;
 
-        private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize, float sleepFactor, long maxSleepTime, boolean disconnectAllOnDataLoop) {
+        private final boolean collectMetrics;
+
+        private final FlowMetric metric;
+
+        private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize, float sleepFactor, long maxSleepTime,
+                           boolean disconnectAllOnDataLoop, boolean collectMetrics, FlowMetric metric) {
             this.emitter = emitter;
             this.flowContext = flowContext;
-            flowProcessor = new FlowProcessor(this, logger, index);
             this.queue = new ArrayBlockingQueue<>(queueSize);
             this.sleepFactor = sleepFactor;
             this.maxSleepTime = maxSleepTime;
             this.disconnectAllOnDataLoop = disconnectAllOnDataLoop;
+            this.collectMetrics = collectMetrics;
+            this.metric = metric;
+            flowProcessor = new FlowProcessor(this, logger, index);
         }
 
         public FlowContext flowContext() {
@@ -351,12 +361,12 @@
 
         @Override
         public boolean isCollectMetrics() {
-            return false;
+            return collectMetrics;
         }
 
         @Override
         public FlowMetric getFlowMetric() {
-            return null;
+            return metric;
         }
 
         @Override