Mercurial > stress-tester
changeset 1253:0a0631e1b289
ParallelFlowWorker bugfixes
author | Devel 2 |
---|---|
date | Thu, 02 Jul 2020 09:22:29 +0200 |
parents | aa5ba90755dd |
children | 9e0796bd75b1 |
files | stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java |
diffstat | 1 files changed, 17 insertions(+), 7 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Thu Jul 02 08:37:25 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Thu Jul 02 09:22:29 2020 +0200 @@ -150,7 +150,7 @@ flowContext.client(flowHandler); FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue, - sleepFactor, maxSleepTime, disconnectAllOnDataLoop); + sleepFactor, maxSleepTime, disconnectAllOnDataLoop, collectMetrics, metric); flowThread.start(); sessions.put(session, flowThread); return flowThread; @@ -200,6 +200,7 @@ logger.trace("Event processing: {}", event); } + if (event instanceof SessionEvent) { switch (event.getType()) { case SessionStatusEvent.TYPE: { @@ -221,10 +222,12 @@ SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event; FlowThread flowThread = flowThread(payloadEvent); - if (connectPartialSession) { + if (flowThread == null && connectPartialSession) { SessionInfo sessionInfo = payloadEvent.getSessionInfo(); flowThread = register(sessionInfo); - flowThread.handle(SessionStatusEvent.establishedEvent(sessionInfo)); + if (flowThread != null) { + flowThread.handle(SessionStatusEvent.establishedEvent(sessionInfo)); + } } if (flowThread != null) { @@ -295,14 +298,21 @@ private final boolean disconnectAllOnDataLoop; - private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize, float sleepFactor, long maxSleepTime, boolean disconnectAllOnDataLoop) { + private final boolean collectMetrics; + + private final FlowMetric metric; + + private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize, float sleepFactor, long maxSleepTime, + boolean disconnectAllOnDataLoop, boolean collectMetrics, FlowMetric metric) { this.emitter = emitter; this.flowContext = flowContext; - flowProcessor = new FlowProcessor(this, logger, index); this.queue = new ArrayBlockingQueue<>(queueSize); this.sleepFactor = sleepFactor; this.maxSleepTime = maxSleepTime; this.disconnectAllOnDataLoop = disconnectAllOnDataLoop; + this.collectMetrics = collectMetrics; + this.metric = metric; + flowProcessor = new FlowProcessor(this, logger, index); } public FlowContext flowContext() { @@ -351,12 +361,12 @@ @Override public boolean isCollectMetrics() { - return false; + return collectMetrics; } @Override public FlowMetric getFlowMetric() { - return null; + return metric; } @Override