Mercurial > stress-tester
changeset 1089:b1c79edc6d5e
FlowHandler.destroy method, OperationEvent.execute - signature changed
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java Fri May 08 09:45:09 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java Fri May 08 11:06:03 2020 +0200 @@ -29,6 +29,15 @@ } @Override + public void destroy(FlowContext flowContext) { + if (collectMetrics) { + synchronized (metric) { + metric.deactivate(); + } + } + } + + @Override public FlowHandlerDataDecoder<S> getResponseDecoder(FlowContext flowContext) { return decoder; }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandler.java Fri May 08 09:45:09 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandler.java Fri May 08 11:06:03 2020 +0200 @@ -16,6 +16,8 @@ default void init(FlowContext flowContext) {}; + default void destroy(FlowContext flowContext) {}; + FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext); FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Fri May 08 09:45:09 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Fri May 08 11:06:03 2020 +0200 @@ -129,6 +129,8 @@ if (wait) { waitOpFinished(flowContext, STATE_DISCONNECTED, timeouts.getDisconnectingTimeout()); } + + flowContext.client().destroy(flowContext); } catch (InterruptedException e) { error(flowContext, e); } finally {
--- a/stress-tester/src/main/java/com/passus/st/client/OperationEvent.java Fri May 08 09:45:09 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/OperationEvent.java Fri May 08 11:06:03 2020 +0200 @@ -32,6 +32,6 @@ this.timestamp = timestamp; } - public abstract void execute() throws IOException; + public abstract void execute(FlowContext flowContext) throws IOException; }
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Fri May 08 09:45:09 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Fri May 08 11:06:03 2020 +0200 @@ -114,9 +114,9 @@ FlowContext flowContext = new FlowContext(session); flowContext.createLock(); - FlowHandler client = clientFactory.create(session.getProtocolId()); - client.init(flowContext); - flowContext.client(client); + FlowHandler flowHandler = clientFactory.create(session.getProtocolId()); + flowHandler.init(flowContext); + flowContext.client(flowHandler); FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue); flowThread.start();
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Fri May 08 09:45:09 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Fri May 08 11:06:03 2020 +0200 @@ -223,7 +223,7 @@ flowProcessor.send(flowContext, (SessionPayloadEvent) event, true); } else if (event.getType() == OperationEvent.TYPE) { try { - ((OperationEvent) event).execute(); + ((OperationEvent) event).execute(flowContext); } catch (IOException e) { errorInternal(flowContext, e); }
--- a/stress-tester/src/main/java/com/passus/st/client/WaitForResponseEvent.java Fri May 08 09:45:09 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/WaitForResponseEvent.java Fri May 08 11:06:03 2020 +0200 @@ -6,28 +6,34 @@ public final class WaitForResponseEvent extends OperationEvent { + public static final WaitForResponseEvent DEFAULT = new WaitForResponseEvent(); + public static int TYPE = 13; - private final FlowContext flowContext; + private final long timeout; private final boolean closeOnTimeout; - public WaitForResponseEvent(FlowContext flowContext) { - this(flowContext, true); + public WaitForResponseEvent() { + this(Timeouts.DEFAULT_TIMEOUT, true); } - public WaitForResponseEvent(FlowContext flowContext, boolean closeOnTimeout) { - this.flowContext = flowContext; + public WaitForResponseEvent(long timeout) { + this(timeout, true); + } + + public WaitForResponseEvent(long timeout, boolean closeOnTimeout) { + this.timeout = timeout; this.closeOnTimeout = closeOnTimeout; } @Override - public void execute() throws IOException { + public void execute(FlowContext flowContext) throws IOException { flowContext.lock(); try { boolean ok = true; try { - ok = waitForResponse(flowContext); + ok = waitForResponse(flowContext, timeout); } catch (InterruptedException ignore) { }
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java Fri May 08 09:45:09 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java Fri May 08 11:06:03 2020 +0200 @@ -15,10 +15,14 @@ private final Logger LOGGER = LogManager.getLogger(PgSqlFlowHandler.class); + private WaitForResponseEvent waitForResponseEvent; + TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); private final MySqlDecoderContext context = new MySqlDecoderContext(); + private long responseTimeout = Timeouts.DEFAULT_TIMEOUT; + @Override protected FlowHandlerDataEncoder<MySqlPacket> createEncoder() { return new MySqlFlowHandlerDataEncoder(context); @@ -40,8 +44,15 @@ } @Override + public void init(FlowContext flowContext) { + super.init(flowContext); + flowContext.setBidirectional(true); + waitForResponseEvent = new WaitForResponseEvent(responseTimeout, true); + } + + @Override public void onConnected(FlowContext flowContext) { - flowContext.queueAddFirst(new WaitForResponseEvent(flowContext)); + flowContext.queueAddFirst(waitForResponseEvent); } @Override @@ -97,10 +108,5 @@ this.timeGenerator = timeGenerator; } - @Override - public void init(FlowContext flowContext) { - flowContext.setBidirectional(true); - } - } \ No newline at end of file