Mercurial > stress-tester
changeset 1254:9e0796bd75b1
ParallelFlowWorker bugfix
author | Devel 2 |
---|---|
date | Thu, 02 Jul 2020 09:41:32 +0200 |
parents | 0a0631e1b289 |
children | b4b1febc1058 |
files | stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java |
diffstat | 1 files changed, 31 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Thu Jul 02 09:22:29 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Thu Jul 02 09:41:32 2020 +0200 @@ -6,6 +6,7 @@ import com.passus.st.emitter.Emitter; import com.passus.st.emitter.SessionInfo; import com.passus.st.filter.FlowFilterChain; +import com.passus.st.metric.MetricsContainer; import com.passus.st.plugin.PluginConstants; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -33,6 +34,8 @@ protected final Map<SessionInfo, FlowThread> sessions = new ConcurrentHashMap<>(); + private final PerNameMetricsContainer deferredMetrics = new PerNameMetricsContainer(); + private int maxSentRequests = DEFAULT_MAX_SENT_REQUESTS; private long eventsQueueWaitTime = 100; @@ -94,6 +97,26 @@ } @Override + public void writeMetrics(MetricsContainer container) { + super.writeMetrics(container); + + if (collectMetrics) { + sessions.forEach((s, t) -> { + t.flowContext.client().writeMetrics(container); + }); + + if (!deferredMetrics.isEmpty()) { + deferredMetrics.getMetrics().forEach(m -> { + container.update(m); + m.reset(); + }); + + deferredMetrics.clear(); + } + } + } + + @Override public boolean isWorking() { return working; } @@ -147,6 +170,7 @@ flowContext.createLock(); FlowHandler flowHandler = clientFactory.create(session.getProtocolId()); flowHandler.init(flowContext); + flowHandler.setCollectMetrics(collectMetrics); flowContext.client(flowHandler); FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue, @@ -370,6 +394,13 @@ } @Override + public void onDisconnected(FlowContext flowContext) { + if (collectMetrics) { + flowContext.flowHandler.writeMetrics(deferredMetrics); + } + } + + @Override public void onResponseReceived(FlowContext flowContext, Object response) { fireResponseReceived(flowContext.sentEvent.getRequest(), response, flowContext); }