Mercurial > stress-tester
changeset 1156:d9dbc633012c
FlowExecutor.sleepFactor enabled
author | Devel 2 |
---|---|
date | Mon, 15 Jun 2020 08:46:19 +0200 |
parents | 3eefcb0f98cf |
children | cbc27572188d |
files | stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java |
diffstat | 3 files changed, 50 insertions(+), 12 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java Mon Jun 15 08:07:16 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java Mon Jun 15 08:46:19 2020 +0200 @@ -35,6 +35,8 @@ private static final boolean DEFAULT_COLLECT_METRICS = false; + public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f; + private static final Logger LOGGER = LogManager.getLogger(FlowExecutor.class); private Emitter emitter; @@ -49,7 +51,7 @@ private volatile boolean started = false; - //private float sleepFactor = FlowWorker.SLEEP_FACTOR_NO_SLEEP; + private float sleepFactor = SLEEP_FACTOR_NO_SLEEP; private String workerType = "synch"; @@ -115,13 +117,13 @@ this.workersNum = workersNum; } -/* public float getSleepFactor() { + public float getSleepFactor() { return sleepFactor; } public void setSleepFactor(float sleepFactor) { this.sleepFactor = sleepFactor; - }*/ + } public FlowWorkerDispatcher getDispatcher() { return dispatcher; @@ -193,6 +195,7 @@ connectPartialSession = config.getBoolean("connectPartialSession", DEFAULT_CONNECT_PARTIAL_SESSION); collectMetrics = config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS); dispatcher = config.get("dispatcher", null); + sleepFactor = config.getFloat("sleepFactor", SLEEP_FACTOR_NO_SLEEP); } @Override @@ -253,9 +256,11 @@ worker.setListener(listener); worker.setFilterChain(filterChain.instanceForWorker(i)); -// if (worker instanceof HttpFlowBasedClientWorker) { -// ((HttpFlowBasedClientWorker) worker).setSleepFactor(sleepFactor); -// } + if (worker instanceof SynchFlowWorker) { + ((SynchFlowWorker) worker).setSleepFactor(sleepFactor); + } else if (worker instanceof ParallelFlowWorker) { + ((ParallelFlowWorker) worker).setSleepFactor(sleepFactor); + } worker.setCollectMetrics(collectMetrics); worker.setConnectPartialSession(connectPartialSession); @@ -316,7 +321,6 @@ LOGGER.debug(e.getMessage(), e); } } - } private static class DispatcherTransformer implements ValueTransformer { @@ -362,6 +366,7 @@ tupleDef("connectPartialSession", BOOLEAN_DEF).setRequired(false), tupleDef("collectMetrics", BOOLEAN_DEF).setRequired(false), tupleDef("workers", INT_GREATER_THAN_ZERO_DEF).setRequired(false), + tupleDef("sleepFactor", valueDefFloat()).setRequired(false), tupleDef("dispatcher", valueDef() .addValidator(new EnumValidator(DISPATCHERS, false)) .setTransformer(FlowExecutor.DispatcherTransformer.INSTANCE)
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Mon Jun 15 08:07:16 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Mon Jun 15 08:46:19 2020 +0200 @@ -17,6 +17,7 @@ import java.util.concurrent.LinkedBlockingDeque; import static com.passus.st.client.FlowContext.STATE_CONNECTED; +import static com.passus.st.client.FlowExecutor.SLEEP_FACTOR_NO_SLEEP; import static com.passus.st.client.FlowUtils.*; @Plugin(name = ParallelFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER) @@ -39,6 +40,8 @@ private volatile boolean working; + private float sleepFactor = SLEEP_FACTOR_NO_SLEEP; + public ParallelFlowWorker(Emitter emitter, String name, int index) { super(emitter, name, index); } @@ -52,7 +55,6 @@ this.maxSentRequests = maxSentRequests; } - public long getEventsQueueWaitTime() { return eventsQueueWaitTime; } @@ -62,6 +64,14 @@ this.eventsQueueWaitTime = eventsQueueWaitTime; } + public float getSleepFactor() { + return sleepFactor; + } + + public void setSleepFactor(float sleepFactor) { + this.sleepFactor = sleepFactor; + } + @Override public boolean isWorking() { return working; @@ -118,7 +128,7 @@ flowHandler.init(flowContext); flowContext.client(flowHandler); - FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue); + FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue, sleepFactor); flowThread.start(); sessions.put(session, flowThread); return flowThread; @@ -253,13 +263,18 @@ private final FlowProcessor flowProcessor; + private final float sleepFactor; + + private long lastEventTimestamp = -1; + private final BlockingQueue<Event> queue; - private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize) { + private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize, float sleepFactor) { this.emitter = emitter; this.flowContext = flowContext; flowProcessor = new FlowProcessor(this, logger, index); this.queue = new ArrayBlockingQueue<>(queueSize); + this.sleepFactor = sleepFactor; } public FlowContext flowContext() { @@ -357,6 +372,16 @@ } + private void sleep(Event event) { + if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) { + if (lastEventTimestamp != -1) { + long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor); + sleepSilently(timeToSleep); + } + lastEventTimestamp = event.getTimestamp(); + } + } + @Override public void run() { working = true; @@ -373,6 +398,7 @@ } if (event != null) { + sleep(event); if (trace) { trace(logger, flowContext, "Event processing: {}", event); }
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Mon Jun 15 08:07:16 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Mon Jun 15 08:46:19 2020 +0200 @@ -18,6 +18,7 @@ import static com.passus.st.client.FlowContext.STATE_CONNECTED; import static com.passus.st.client.FlowContext.STATE_DISCONNECTED; +import static com.passus.st.client.FlowExecutor.SLEEP_FACTOR_NO_SLEEP; import static com.passus.st.client.FlowUtils.*; @Plugin(name = SynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER) @@ -25,8 +26,6 @@ protected final Logger logger = LogManager.getLogger(getClass()); - public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f; - public static final String TYPE = "synch"; private volatile boolean working = false; @@ -65,6 +64,14 @@ this.eventsQueueWaitTime = eventsQueueWaitTime; } + public float getSleepFactor() { + return sleepFactor; + } + + public void setSleepFactor(float sleepFactor) { + this.sleepFactor = sleepFactor; + } + @Override public int activeConnections() { int count = 0;