changeset 1254:9e0796bd75b1

ParallelFlowWorker bugfix
author Devel 2
date Thu, 02 Jul 2020 09:41:32 +0200
parents 0a0631e1b289
children b4b1febc1058
files stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java
diffstat 1 files changed, 31 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Thu Jul 02 09:22:29 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Thu Jul 02 09:41:32 2020 +0200
@@ -6,6 +6,7 @@
 import com.passus.st.emitter.Emitter;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.filter.FlowFilterChain;
+import com.passus.st.metric.MetricsContainer;
 import com.passus.st.plugin.PluginConstants;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -33,6 +34,8 @@
 
     protected final Map<SessionInfo, FlowThread> sessions = new ConcurrentHashMap<>();
 
+    private final PerNameMetricsContainer deferredMetrics = new PerNameMetricsContainer();
+
     private int maxSentRequests = DEFAULT_MAX_SENT_REQUESTS;
 
     private long eventsQueueWaitTime = 100;
@@ -94,6 +97,26 @@
     }
 
     @Override
+    public void writeMetrics(MetricsContainer container) {
+        super.writeMetrics(container);
+
+        if (collectMetrics) {
+            sessions.forEach((s, t) -> {
+                t.flowContext.client().writeMetrics(container);
+            });
+
+            if (!deferredMetrics.isEmpty()) {
+                deferredMetrics.getMetrics().forEach(m -> {
+                    container.update(m);
+                    m.reset();
+                });
+
+                deferredMetrics.clear();
+            }
+        }
+    }
+
+    @Override
     public boolean isWorking() {
         return working;
     }
@@ -147,6 +170,7 @@
         flowContext.createLock();
         FlowHandler flowHandler = clientFactory.create(session.getProtocolId());
         flowHandler.init(flowContext);
+        flowHandler.setCollectMetrics(collectMetrics);
         flowContext.client(flowHandler);
 
         FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue,
@@ -370,6 +394,13 @@
         }
 
         @Override
+        public void onDisconnected(FlowContext flowContext) {
+            if (collectMetrics) {
+                flowContext.flowHandler.writeMetrics(deferredMetrics);
+            }
+        }
+
+        @Override
         public void onResponseReceived(FlowContext flowContext, Object response) {
             fireResponseReceived(flowContext.sentEvent.getRequest(), response, flowContext);
         }