Mercurial > stress-tester
changeset 1240:f082bcb89613
FlowExecutor.disconnectAllOnDataLoop
author | Devel 2 |
---|---|
date | Tue, 30 Jun 2020 15:52:22 +0200 |
parents | ce9ac16deb77 |
children | f1a454e020e5 |
files | stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java |
diffstat | 3 files changed, 72 insertions(+), 19 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java Tue Jun 30 12:11:50 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java Tue Jun 30 15:52:22 2020 +0200 @@ -34,8 +34,11 @@ private static final boolean DEFAULT_COLLECT_METRICS = false; - public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f; - public static final long MAX_SLEEP_TIME = 0; + private static final boolean DEFAULT_DISCONNECT_ALL_ON_DATA_LOOP = true; + + public static final float DEFAULT_SLEEP_FACTOR_NO_SLEEP = 0.0f; + + public static final long DEFAULT_MAX_SLEEP_TIME = 0; private static final Logger LOGGER = LogManager.getLogger(FlowExecutor.class); @@ -51,9 +54,9 @@ private volatile boolean started = false; - private float sleepFactor = SLEEP_FACTOR_NO_SLEEP; + private float sleepFactor = DEFAULT_SLEEP_FACTOR_NO_SLEEP; - private long maxSleepTime = MAX_SLEEP_TIME; + private long maxSleepTime = DEFAULT_MAX_SLEEP_TIME; private String workerType = "synch"; @@ -63,6 +66,8 @@ private boolean connectPartialSession = DEFAULT_CONNECT_PARTIAL_SESSION; + private boolean disconnectAllOnDataLoop = DEFAULT_DISCONNECT_ALL_ON_DATA_LOOP; + private FlowWorkerDispatcher dispatcher; public Emitter getEmitter() { @@ -135,6 +140,14 @@ this.maxSleepTime = maxSleepTime; } + public boolean isDisconnectAllOnDataLoop() { + return disconnectAllOnDataLoop; + } + + public void setDisconnectAllOnDataLoop(boolean disconnectAllOnDataLoop) { + this.disconnectAllOnDataLoop = disconnectAllOnDataLoop; + } + public FlowWorkerDispatcher getDispatcher() { return dispatcher; } @@ -204,9 +217,10 @@ setWorkersNum(config.getInteger("workers", DEFAULT_WORKERS_NUM)); connectPartialSession = config.getBoolean("connectPartialSession", DEFAULT_CONNECT_PARTIAL_SESSION); collectMetrics = config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS); + disconnectAllOnDataLoop = config.getBoolean("disconnectAllOnDataLoop", DEFAULT_DISCONNECT_ALL_ON_DATA_LOOP); dispatcher = config.get("dispatcher", null); - sleepFactor = config.getFloat("sleepFactor", SLEEP_FACTOR_NO_SLEEP); - maxSleepTime = config.getLong("maxSleepTime", MAX_SLEEP_TIME); + sleepFactor = config.getFloat("sleepFactor", DEFAULT_SLEEP_FACTOR_NO_SLEEP); + maxSleepTime = config.getLong("maxSleepTime", DEFAULT_MAX_SLEEP_TIME); } @Override @@ -271,10 +285,12 @@ SynchFlowWorker synchWorker = (SynchFlowWorker) worker; synchWorker.setSleepFactor(sleepFactor); synchWorker.setMaxSleepTime(maxSleepTime); + synchWorker.setDisconnectAllOnDataLoop(disconnectAllOnDataLoop); } else if (worker instanceof ParallelFlowWorker) { ParallelFlowWorker parallelWorker = (ParallelFlowWorker) worker; parallelWorker.setSleepFactor(sleepFactor); parallelWorker.setMaxSleepTime(maxSleepTime); + parallelWorker.setDisconnectAllOnDataLoop(disconnectAllOnDataLoop); } worker.setCollectMetrics(collectMetrics); @@ -380,6 +396,7 @@ return mapDef( tupleDef("connectPartialSession", BOOLEAN_DEF).setRequired(false), tupleDef("collectMetrics", BOOLEAN_DEF).setRequired(false), + tupleDef("disconnectAllOnDataLoop", BOOLEAN_DEF).setRequired(false), tupleDef("workers", INT_GREATER_THAN_ZERO_DEF).setRequired(false), tupleDef("sleepFactor", valueDefFloat()).setRequired(false), tupleDef("maxSleepTime", LONG_DEF).setRequired(false),
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Tue Jun 30 12:11:50 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Tue Jun 30 15:52:22 2020 +0200 @@ -17,8 +17,8 @@ import java.util.concurrent.LinkedBlockingDeque; import static com.passus.st.client.FlowContext.STATE_CONNECTED; -import static com.passus.st.client.FlowExecutor.MAX_SLEEP_TIME; -import static com.passus.st.client.FlowExecutor.SLEEP_FACTOR_NO_SLEEP; +import static com.passus.st.client.FlowExecutor.DEFAULT_MAX_SLEEP_TIME; +import static com.passus.st.client.FlowExecutor.DEFAULT_SLEEP_FACTOR_NO_SLEEP; import static com.passus.st.client.FlowUtils.*; @Plugin(name = ParallelFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER) @@ -41,9 +41,11 @@ private volatile boolean working; - private float sleepFactor = SLEEP_FACTOR_NO_SLEEP; + private float sleepFactor = DEFAULT_SLEEP_FACTOR_NO_SLEEP; - private long maxSleepTime = MAX_SLEEP_TIME; + private long maxSleepTime = DEFAULT_MAX_SLEEP_TIME; + + private boolean disconnectAllOnDataLoop = true; public ParallelFlowWorker(Emitter emitter, String name, int index) { super(emitter, name, index); @@ -83,6 +85,14 @@ this.maxSleepTime = maxSleepTime; } + public boolean isDisconnectAllOnDataLoop() { + return disconnectAllOnDataLoop; + } + + public void setDisconnectAllOnDataLoop(boolean disconnectAllOnDataLoop) { + this.disconnectAllOnDataLoop = disconnectAllOnDataLoop; + } + @Override public boolean isWorking() { return working; @@ -139,7 +149,8 @@ flowHandler.init(flowContext); flowContext.client(flowHandler); - FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue, sleepFactor, maxSleepTime); + FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue, + sleepFactor, maxSleepTime, disconnectAllOnDataLoop); flowThread.start(); sessions.put(session, flowThread); return flowThread; @@ -282,13 +293,16 @@ private final BlockingQueue<Event> queue; - private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize, float sleepFactor, long maxSleepTime) { + private final boolean disconnectAllOnDataLoop; + + private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize, float sleepFactor, long maxSleepTime, boolean disconnectAllOnDataLoop) { this.emitter = emitter; this.flowContext = flowContext; flowProcessor = new FlowProcessor(this, logger, index); this.queue = new ArrayBlockingQueue<>(queueSize); this.sleepFactor = sleepFactor; this.maxSleepTime = maxSleepTime; + this.disconnectAllOnDataLoop = disconnectAllOnDataLoop; } public FlowContext flowContext() { @@ -387,7 +401,7 @@ } private void sleep(Event event) { - if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) { + if (sleepFactor != DEFAULT_SLEEP_FACTOR_NO_SLEEP) { if (lastEventTimestamp != -1) { long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor); if (maxSleepTime > 0 && timeToSleep > maxSleepTime) { @@ -447,6 +461,14 @@ debug(logger, flowContext, "DataLoopEnd received."); } + if (disconnectAllOnDataLoop) { + finish(); + } + } else if (event.getType() == DataEvents.DataEnd.TYPE) { + if (logger.isDebugEnabled()) { + logger.debug("DataEnd received. Deactivation."); + } + finish(); } }
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Tue Jun 30 12:11:50 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Tue Jun 30 15:52:22 2020 +0200 @@ -17,8 +17,8 @@ import java.util.concurrent.LinkedBlockingDeque; import static com.passus.st.client.FlowContext.*; -import static com.passus.st.client.FlowExecutor.MAX_SLEEP_TIME; -import static com.passus.st.client.FlowExecutor.SLEEP_FACTOR_NO_SLEEP; +import static com.passus.st.client.FlowExecutor.DEFAULT_MAX_SLEEP_TIME; +import static com.passus.st.client.FlowExecutor.DEFAULT_SLEEP_FACTOR_NO_SLEEP; import static com.passus.st.client.FlowUtils.*; @Plugin(name = SynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER) @@ -44,11 +44,13 @@ private FlowProcessor flowProcessor; - private float sleepFactor = SLEEP_FACTOR_NO_SLEEP; + private float sleepFactor = DEFAULT_SLEEP_FACTOR_NO_SLEEP; private long lastEventTimestamp = -1; - private long maxSleepTime = MAX_SLEEP_TIME; + private long maxSleepTime = DEFAULT_MAX_SLEEP_TIME; + + private boolean disconnectAllOnDataLoop = true; public SynchFlowWorker(Emitter emitter, String name, int index) { super(emitter, name, index); @@ -84,6 +86,14 @@ this.maxSleepTime = maxSleepTime; } + public boolean isDisconnectAllOnDataLoop() { + return disconnectAllOnDataLoop; + } + + public void setDisconnectAllOnDataLoop(boolean disconnectAllOnDataLoop) { + this.disconnectAllOnDataLoop = disconnectAllOnDataLoop; + } + @Override public int activeConnections() { int count = 0; @@ -267,7 +277,7 @@ } protected void sleep(Event event) { - if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) { + if (sleepFactor != DEFAULT_SLEEP_FACTOR_NO_SLEEP) { if (lastEventTimestamp != -1) { long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor); if (maxSleepTime > 0 && timeToSleep > maxSleepTime) { @@ -327,13 +337,17 @@ logger.debug("DataLoopEnd received."); } - disconnectAll(); + if (disconnectAllOnDataLoop) { + disconnectAll(); + } + filterChain.reset(); } else if (event.getType() == DataEvents.DataEnd.TYPE) { if (logger.isDebugEnabled()) { logger.debug("DataEnd received. Deactivation."); } + disconnectAll(); eventsQueue.clear(); working = false; }