changeset 1089:b1c79edc6d5e

FlowHandler.destroy method, OperationEvent.execute - signature changed
author Devel 2
date Fri, 08 May 2020 11:06:03 +0200
parents dd193de63d88
children 99f8c3835de6
files stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java stress-tester/src/main/java/com/passus/st/client/FlowHandler.java stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java stress-tester/src/main/java/com/passus/st/client/OperationEvent.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/WaitForResponseEvent.java stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java
diffstat 8 files changed, 43 insertions(+), 18 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java	Fri May 08 09:45:09 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java	Fri May 08 11:06:03 2020 +0200
@@ -29,6 +29,15 @@
     }
 
     @Override
+    public void destroy(FlowContext flowContext) {
+        if (collectMetrics) {
+            synchronized (metric) {
+                metric.deactivate();
+            }
+        }
+    }
+
+    @Override
     public FlowHandlerDataDecoder<S> getResponseDecoder(FlowContext flowContext) {
         return decoder;
     }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandler.java	Fri May 08 09:45:09 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandler.java	Fri May 08 11:06:03 2020 +0200
@@ -16,6 +16,8 @@
 
     default void init(FlowContext flowContext) {};
 
+    default void destroy(FlowContext flowContext) {};
+
     FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext);
 
     FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Fri May 08 09:45:09 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Fri May 08 11:06:03 2020 +0200
@@ -129,6 +129,8 @@
             if (wait) {
                 waitOpFinished(flowContext, STATE_DISCONNECTED, timeouts.getDisconnectingTimeout());
             }
+
+            flowContext.client().destroy(flowContext);
         } catch (InterruptedException e) {
             error(flowContext, e);
         } finally {
--- a/stress-tester/src/main/java/com/passus/st/client/OperationEvent.java	Fri May 08 09:45:09 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/OperationEvent.java	Fri May 08 11:06:03 2020 +0200
@@ -32,6 +32,6 @@
         this.timestamp = timestamp;
     }
 
-    public abstract void execute() throws IOException;
+    public abstract void execute(FlowContext flowContext) throws IOException;
 
 }
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Fri May 08 09:45:09 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Fri May 08 11:06:03 2020 +0200
@@ -114,9 +114,9 @@
 
         FlowContext flowContext = new FlowContext(session);
         flowContext.createLock();
-        FlowHandler client = clientFactory.create(session.getProtocolId());
-        client.init(flowContext);
-        flowContext.client(client);
+        FlowHandler flowHandler = clientFactory.create(session.getProtocolId());
+        flowHandler.init(flowContext);
+        flowContext.client(flowHandler);
 
         FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue);
         flowThread.start();
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Fri May 08 09:45:09 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Fri May 08 11:06:03 2020 +0200
@@ -223,7 +223,7 @@
                 flowProcessor.send(flowContext, (SessionPayloadEvent) event, true);
             } else if (event.getType() == OperationEvent.TYPE) {
                 try {
-                    ((OperationEvent) event).execute();
+                    ((OperationEvent) event).execute(flowContext);
                 } catch (IOException e) {
                     errorInternal(flowContext, e);
                 }
--- a/stress-tester/src/main/java/com/passus/st/client/WaitForResponseEvent.java	Fri May 08 09:45:09 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/WaitForResponseEvent.java	Fri May 08 11:06:03 2020 +0200
@@ -6,28 +6,34 @@
 
 public final class WaitForResponseEvent extends OperationEvent {
 
+    public static final WaitForResponseEvent DEFAULT = new WaitForResponseEvent();
+
     public static int TYPE = 13;
 
-    private final FlowContext flowContext;
+    private final long timeout;
 
     private final boolean closeOnTimeout;
 
-    public WaitForResponseEvent(FlowContext flowContext) {
-        this(flowContext, true);
+    public WaitForResponseEvent() {
+        this(Timeouts.DEFAULT_TIMEOUT, true);
     }
 
-    public WaitForResponseEvent(FlowContext flowContext, boolean closeOnTimeout) {
-        this.flowContext = flowContext;
+    public WaitForResponseEvent(long timeout) {
+        this(timeout, true);
+    }
+
+    public WaitForResponseEvent(long timeout, boolean closeOnTimeout) {
+        this.timeout = timeout;
         this.closeOnTimeout = closeOnTimeout;
     }
 
     @Override
-    public void execute() throws IOException {
+    public void execute(FlowContext flowContext) throws IOException {
         flowContext.lock();
         try {
             boolean ok = true;
             try {
-                ok = waitForResponse(flowContext);
+                ok = waitForResponse(flowContext, timeout);
             } catch (InterruptedException ignore) {
 
             }
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java	Fri May 08 09:45:09 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java	Fri May 08 11:06:03 2020 +0200
@@ -15,10 +15,14 @@
 
     private final Logger LOGGER = LogManager.getLogger(PgSqlFlowHandler.class);
 
+    private WaitForResponseEvent waitForResponseEvent;
+
     TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
 
     private final MySqlDecoderContext context = new MySqlDecoderContext();
 
+    private long responseTimeout = Timeouts.DEFAULT_TIMEOUT;
+
     @Override
     protected FlowHandlerDataEncoder<MySqlPacket> createEncoder() {
         return new MySqlFlowHandlerDataEncoder(context);
@@ -40,8 +44,15 @@
     }
 
     @Override
+    public void init(FlowContext flowContext) {
+        super.init(flowContext);
+        flowContext.setBidirectional(true);
+        waitForResponseEvent = new WaitForResponseEvent(responseTimeout, true);
+    }
+
+    @Override
     public void onConnected(FlowContext flowContext) {
-        flowContext.queueAddFirst(new WaitForResponseEvent(flowContext));
+        flowContext.queueAddFirst(waitForResponseEvent);
     }
 
     @Override
@@ -97,10 +108,5 @@
         this.timeGenerator = timeGenerator;
     }
 
-    @Override
-    public void init(FlowContext flowContext) {
-        flowContext.setBidirectional(true);
-    }
-
 
 }
\ No newline at end of file