changeset 1032:d136672f267c

FlowWorkerBase refactorization in progress
author Devel 2
date Thu, 02 Apr 2020 15:34:59 +0200
parents 37d098b33b23
children 386815ce52ee
files stress-tester/src/main/java/com/passus/st/client/FlowError.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java
diffstat 4 files changed, 47 insertions(+), 30 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowError.java	Thu Apr 02 14:02:38 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowError.java	Thu Apr 02 15:34:59 2020 +0200
@@ -27,11 +27,12 @@
     public static final byte CODE_IO_ERROR = 13;
     public static final byte CODE_MAX_ENCODER_ERRORS_REACHED = 14;
     public static final byte CODE_MAX_DECODER_ERRORS_REACHED = 15;
-    public static final byte CODE_IDE_TIMEOUT = 16;
+    public static final byte CODE_MAX_SEND_ERRORS_REACHED = 16;
+    public static final byte CODE_IDE_TIMEOUT = 17;
 
-    public static final byte CODE_SSL_ERROR = 17;
-    public static final byte CODE_SSL_UNRECOGNIZED_MESSAGE_ERROR = 18;
-    public static final byte CODE_SSL_HANDSHAKE_ERROR = 19;
+    public static final byte CODE_SSL_ERROR = 18;
+    public static final byte CODE_SSL_UNRECOGNIZED_MESSAGE_ERROR = 19;
+    public static final byte CODE_SSL_HANDSHAKE_ERROR = 20;
 
     public static final byte CODE_INTERNAL_ERROR = (byte) 255;
 
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Thu Apr 02 14:02:38 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Thu Apr 02 15:34:59 2020 +0200
@@ -41,6 +41,8 @@
 
     private int maxEncoderErrors = 3;
 
+    private int maxSendErrors = 3;
+
     private long reconnectDelay = 1000;
 
     private float sleepFactor = SLEEP_FACTOR_NO_SLEEP;
@@ -112,6 +114,14 @@
         this.maxEncoderErrors = maxEncoderErrors;
     }
 
+    public int getMaxSendErrors() {
+        return maxSendErrors;
+    }
+
+    public void setMaxSendErrors(int maxSendErrors) {
+        this.maxSendErrors = maxSendErrors;
+    }
+
     protected void flowStateChanged(FlowContext context, int oldState) {
 
     }
@@ -257,7 +267,6 @@
             debug(flowContext, "Disconnect.");
         }
 
-        SessionInfo sessionInfo = null;
         long now = timeGenerator.currentTimeMillis();
         flowContext.lock.lock();
         try {
@@ -266,10 +275,6 @@
                 return;
             }
 
-            /*if (removeFlow) {
-                sessionInfo = flowContext.session;
-            }*/
-
             flowContext.state = STATE_DISCONNECTING;
             flowContext.timeout = now + timeouts.getDisconnectingTimeout();
 
@@ -299,12 +304,6 @@
         } finally {
             flowContext.lock.unlock();
         }
-
-        if (removeFlow && sessionInfo != null) {
-            synchronized (sessions) {
-                sessions.remove(sessionInfo);
-            }
-        }
     }
 
     protected void errorInternal(FlowContext flowContext, Throwable cause) {
@@ -486,8 +485,6 @@
             flowContext.signal();
             flowContext.unlock();
         }
-
-
     }
 
     @Override
@@ -510,9 +507,11 @@
     @Override
     public void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
         FlowContext flowContext = (FlowContext) context.getAttachment();
+        logger.debug("dataReceived");
         flowContext.lock();
         try {
             try {
+                logger.debug("dataReceived-after lock");
                 FlowHandler client = flowContext.client();
                 FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext);
                 decoder.decode(data, flowContext);
@@ -651,8 +650,10 @@
     }
 
     protected void send(FlowContext flowContext, SessionPayloadEvent event, boolean wait) {
+        logger.debug("send");
         flowContext.lock();
         try {
+            logger.debug("send-after lock");
             Object req = event.getRequest();
             if (req != null) {
                 if (filterChain.filterOutbound(req, event.getResponse(), flowContext) == Filter.DENY) {
@@ -686,18 +687,23 @@
                 }
 
                 try {
+                    flowContext.sentEvent = event;
                     flowContext.channelContext().writeAndFlush(buffer);
                     requestSent0(flowContext, event);
                     buffer.clear();
+
+                    if (wait && flowContext.isBidirectional()) {
+                        waitForResponse(flowContext);
+                    }
                 } catch (Exception e) {
                     flowContext.sendErrors++;
                     if (logger.isDebugEnabled()) {
                         debug(flowContext, e.getMessage(), e);
                     }
-                }
 
-                if (wait && flowContext.isBidirectional()) {
-                    waitForResponse(flowContext);
+                    if (flowContext.sendErrors == maxSendErrors) {
+                        error(flowContext, new FlowError(CODE_MAX_SEND_ERRORS_REACHED, "Max send errors reached."));
+                    }
                 }
             }
         } catch (Exception e) {
@@ -712,8 +718,9 @@
     }
 
     protected boolean waitForResponse(FlowContext flowContext, long timeout) throws InterruptedException {
+        logger.debug("waitForResponse");
         long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeout);
-        while (flowContext.sentEvent != null && flowContext.state == STATE_CONNECTED) {
+        while (flowContext.sentEvent != null && !flowContext.isError()) {
             if (timeNanos <= 0) {
                 return false;
             }
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Thu Apr 02 14:02:38 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Thu Apr 02 15:34:59 2020 +0200
@@ -9,6 +9,7 @@
 import com.passus.net.http.HttpRequest;
 import com.passus.net.http.HttpResponse;
 import com.passus.net.http.HttpResponseEncoder;
+import com.passus.st.Log4jConfigurationFactory;
 import com.passus.st.emitter.*;
 import com.passus.st.metric.MetricsContainer;
 import com.passus.st.utils.EventUtils;
@@ -18,6 +19,9 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static com.passus.st.utils.Assert.assertHttpClientEvents;
 import static org.testng.AssertJUnit.assertEquals;
@@ -25,7 +29,7 @@
 
 public class SynchFlowWorkerTest {
 
-    public static final long JOIN_TIMEOUT = 1_000;
+    public static final long JOIN_TIMEOUT = Long.MAX_VALUE;
 
     private final TestHttpClientListener listener = new TestHttpClientListener();
 
@@ -37,6 +41,8 @@
 
         private final DataEncoder encoder;
 
+        private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
         public LocalEmitter() {
             this(HttpResponseEncoder.INSTANCE);
         }
@@ -76,11 +82,14 @@
             HttpResponse response = (HttpResponse) event.getResponse();
             ByteBuff buff = new HeapByteBuff();
             encoder.encode(response, buff);
-            try {
-                clientWorker.dataReceived(channelContext, buff);
-            } catch (Exception ex) {
-                ex.printStackTrace();
-            }
+
+            executor.execute(() -> {
+                try {
+                    clientWorker.dataReceived(channelContext, buff);
+                } catch (Exception ex) {
+                    ex.printStackTrace();
+                }
+            });
         }
 
         protected void close(LocalChannelContext channelContext) {
@@ -251,7 +260,7 @@
 
         }
 
-        assertFalse(worker.isWorking());
+        assertFalse("Worker is still working.", worker.isWorking());
     }
 
     private List<Event> readDefaultEvents() throws Exception {
@@ -290,7 +299,7 @@
     }
 
     @Test
-    public void testHandle_EmitterException() throws Exception {
+    public void testHandle_EmitterException_SendErrorsNotReached() throws Exception {
         List<Event> events = readDefaultEvents();
         LocalEmitter emitter = new LocalEmitter((object, out) -> {
             throw new RuntimeException("Test exception");
--- a/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java	Thu Apr 02 14:02:38 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java	Thu Apr 02 15:34:59 2020 +0200
@@ -54,7 +54,7 @@
         try {
             flowExecutor.start();
             events.forEach(flowExecutor::handle);
-            flowExecutor.join(2_000);
+            flowExecutor.join(5_000);
 
             assertEquals(0, listener.getErrors());
             assertEquals(2, listener.getReceived().size());