changeset 1023:831da81c262f

FlowWorkerBase refactorization in progress
author Devel 2
date Tue, 31 Mar 2020 11:55:55 +0200
parents fa7ecf793f0c
children 14adc04b1b8d
files stress-tester/src/main/java/com/passus/st/client/FlowContext.java stress-tester/src/main/java/com/passus/st/client/FlowError.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java
diffstat 5 files changed, 109 insertions(+), 64 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Mon Mar 30 15:27:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Tue Mar 31 11:55:55 2020 +0200
@@ -14,8 +14,6 @@
     public static final byte STATE_ERROR = -1;
     public static final byte STATE_CONNECTING = 0;
     public static final byte STATE_CONNECTED = 1;
-    /*    public static final byte STATE_REQ_SENT = 2;
-        public static final byte STATE_RESP_RECEIVED = 3;*/
     public static final byte STATE_DISCONNECTING = 4;
     public static final byte STATE_DISCONNECTED = 5;
 
@@ -55,7 +53,7 @@
 
     protected int sendErrors;
 
-    protected FlowError lastError;
+    protected FlowError error;
 
     @Deprecated
     protected DataDecoder decoder;
@@ -106,6 +104,24 @@
         return timeout;
     }
 
+    public boolean isBlocked() {
+        return blocked;
+    }
+
+    public void error(FlowError error) {
+        this.error = error;
+        this.state = STATE_ERROR;
+        blocked = true;
+    }
+
+    public FlowError error() {
+        return error;
+    }
+
+    public boolean isError() {
+        return error != null;
+    }
+
     public long receivedStartTimestamp() {
         return receivedStartTime;
     }
@@ -171,14 +187,6 @@
         return session;
     }
 
-    public boolean isBlocked() {
-        return blocked;
-    }
-
-    public FlowError getLastError() {
-        return lastError;
-    }
-
     public Object getParam(String name) {
         if (params == null) {
             return null;
@@ -226,6 +234,8 @@
         buffer = null;
         sentEvent = null;
         timeout = -1;
+        error = null;
+        blocked = false;
 
         if (params != null) {
             params.clear();
--- a/stress-tester/src/main/java/com/passus/st/client/FlowError.java	Mon Mar 30 15:27:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowError.java	Tue Mar 31 11:55:55 2020 +0200
@@ -9,54 +9,76 @@
 
 public class FlowError {
 
-    public static final int CODE_UNKNOWN_ERROR = 0;
-    public static final int CODE_CONNECTION_ERROR = 1;
-    public static final int CODE_CONNECTION_REFUSED = 2;
-    public static final int CODE_CONNECTION_TIMEOUT = 3;
-    public static final int CODE_UNKNOWN_HOST = 4;
-    public static final int CODE_NO_ROUTE_TO_HOST = 5;
-    public static final int CODE_NETWORK_UNREACHABLE = 6;
+    public static final byte CODE_UNKNOWN_ERROR = 0;
 
-    public static final int CODE_READ_TIMEOUT = 7;
-    public static final int CODE_INVALID_RESPONSE = 8;
-    public static final int CODE_CONNECTION_CLOSED_UNEXPECTEDLY = 9;
-    public static final int CODE_CONNECTION_RESET_BY_PEER = 10;
-    public static final int CODE_SOCKET_ERROR = 11;
-    public static final int CODE_IO_ERROR = 12;
+    public static final byte CODE_CONNECTION_ERROR = 1;
+    public static final byte CODE_CONNECTION_REFUSED = 2;
+    public static final byte CODE_CONNECTION_TIMEOUT = 3;
+    public static final byte CODE_UNKNOWN_HOST = 4;
+    public static final byte CODE_NO_ROUTE_TO_HOST = 5;
+    public static final byte CODE_NETWORK_UNREACHABLE = 6;
+    public static final byte CODE_CONNECTION_ATTEMPTS_REACHED = 7;
 
-    public static final int CODE_SSL_ERROR = 13;
-    public static final int CODE_SSL_UNRECOGNIZED_MESSAGE_ERROR = 14;
-    public static final int CODE_SSL_HANDSHAKE_ERROR = 15;
+    public static final byte CODE_READ_TIMEOUT = 8;
+    public static final byte CODE_INVALID_RESPONSE = 9;
+    public static final byte CODE_CONNECTION_CLOSED_UNEXPECTEDLY = 10;
+    public static final byte CODE_CONNECTION_RESET_BY_PEER = 11;
+    public static final byte CODE_SOCKET_ERROR = 12;
+    public static final byte CODE_IO_ERROR = 13;
+    public static final byte CODE_MAX_ENCODER_ERRORS_REACHED = 14;
+    public static final byte CODE_MAX_DECODER_ERRORS_REACHED = 15;
+    public static final byte CODE_IDE_TIMEOUT = 16;
 
-    private final int code;
+    public static final byte CODE_SSL_ERROR = 17;
+    public static final byte CODE_SSL_UNRECOGNIZED_MESSAGE_ERROR = 18;
+    public static final byte CODE_SSL_HANDSHAKE_ERROR = 19;
+
+    private final byte code;
 
     private final String message;
 
-    private final Exception cause;
+    private final Throwable cause;
 
-    public FlowError(int code, String message) {
+    public FlowError(byte code, String message) {
         this(code, message, null);
     }
 
-    public FlowError(int code, String message, Exception cause) {
+    public FlowError(byte code, String message, Throwable cause) {
         this.code = code;
         this.message = message;
         this.cause = cause;
     }
 
-    public int getCode() {
+    public byte code() {
         return code;
     }
 
-    public String getMessage() {
+    public String message() {
         return message;
     }
 
-    public Exception getCause() {
+    public Throwable cause() {
         return cause;
     }
 
-    public static FlowError interpret(Exception ex, String stackTrace) {
+    @Override
+    public String toString() {
+        return "FlowError{" +
+                "code=" + code +
+                ", message='" + message + '\'' +
+                ", cause=" + cause +
+                '}';
+    }
+
+    public static FlowError unknownError() {
+        return unknownError(null);
+    }
+
+    public static FlowError unknownError(Throwable cause) {
+        return new FlowError(CODE_UNKNOWN_ERROR, "Unknown error.", cause);
+    }
+
+    public static FlowError interpret(Throwable ex, String stackTrace) {
         if (stackTrace.contains("Connection refused")) {
             return new FlowError(CODE_CONNECTION_REFUSED, "Connection refused.", ex);
         } else if (stackTrace.contains("connect timed out")) {
@@ -92,10 +114,10 @@
             return new FlowError(CODE_IO_ERROR, "Networking IO error.", ex);
         }
 
-        return new FlowError(CODE_UNKNOWN_ERROR, "Unknown error.", ex);
+        return unknownError(ex);
     }
 
-    public static FlowError interpret(Exception ex) {
+    public static FlowError interpret(Throwable ex) {
         try {
             String stackTrace;
             try (StringWriter sw = new StringWriter();
@@ -109,7 +131,7 @@
 
         }
 
-        return new FlowError(CODE_UNKNOWN_ERROR, "Unknown error.", ex);
+        return unknownError(ex);
     }
 
 }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Mon Mar 30 15:27:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Tue Mar 31 11:55:55 2020 +0200
@@ -12,13 +12,14 @@
 import it.unimi.dsi.fastutil.bytes.Byte2LongArrayMap;
 import it.unimi.dsi.fastutil.bytes.Byte2LongMap;
 
-import java.io.IOException;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static com.passus.st.client.FlowContext.*;
+import static com.passus.st.client.FlowError.CODE_CONNECTION_ATTEMPTS_REACHED;
+import static com.passus.st.client.FlowError.CODE_MAX_ENCODER_ERRORS_REACHED;
 import static com.passus.st.client.FlowUtils.DEFAULT_TIMEOUTS;
 
 public abstract class FlowWorkerBase extends FlowWorker {
@@ -255,30 +256,39 @@
         }
     }
 
-    private void error(FlowContext flowContext, Throwable cause) {
+    protected void error(FlowContext flowContext, Throwable cause) {
+        error(flowContext, FlowError.interpret(cause));
+    }
+
+    protected void error(FlowContext flowContext, FlowError error) {
         if (flowContext.state >= STATE_CONNECTED && flowContext.state < STATE_DISCONNECTING) {
             disconnect(flowContext, false);
         }
 
-        flowContext.blocked = true;
-        updateFlowState(flowContext, STATE_ERROR);
+        flowContext.error(error);
+        flowError(flowContext);
     }
 
-    private void requestSent0(FlowContext flowContext) {
-        requestSent(flowContext);
-    }
-
-    protected void requestSent(FlowContext flowContext) {
+    protected void flowError(FlowContext flowContext) {
 
     }
 
-    private void responseReceived0(FlowContext flowContext) {
-        responseReceived(flowContext);
+    private void requestSent0(FlowContext flowContext, SessionPayloadEvent event) {
+        flowContext.sentEvent = event;
+        onRequestSent(flowContext, event);
+    }
+
+    protected void onRequestSent(FlowContext flowContext, SessionPayloadEvent event) {
+
+    }
+
+    private void responseReceived0(FlowContext flowContext, Object response) {
+        onResponseReceived(flowContext, response);
         flowContext.sentEvent(null);
         flowContext.receivedStartTimestamp(-1);
     }
 
-    protected void responseReceived(FlowContext flowContext) {
+    protected void onResponseReceived(FlowContext flowContext, Object response) {
 
     }
 
@@ -442,7 +452,7 @@
                     }
 
                     decoder.clear(flowContext);
-                    responseReceived0(flowContext);
+                    responseReceived0(flowContext, null);
                 } else if (decoder.state() == DataDecoder.STATE_FINISHED) {
                     if (collectMetric) {
                         synchronized (metric) {
@@ -466,7 +476,7 @@
                     }
 
                     decoder.clear(flowContext);
-                    responseReceived0(flowContext);
+                    responseReceived0(flowContext, resp);
                 }
             } catch (Exception e) {
                 if (collectMetric) {
@@ -478,6 +488,8 @@
                 if (logger.isDebugEnabled()) {
                     debug(flowContext, e.getMessage(), e);
                 }
+
+                error(flowContext, FlowError.unknownError());
             }
 
             lock.notifyAll();
@@ -510,7 +522,7 @@
 
                 flowContext.client().onDataWriteEnd(flowContext);
                 if (!flowContext.isBidirectional()) {
-                    responseReceived0(flowContext);
+                    responseReceived0(flowContext, null);
                 }
             }
 
@@ -548,7 +560,7 @@
                 }
             }
 
-            error(flowContext, cause);
+            error(flowContext, new FlowError(CODE_CONNECTION_ATTEMPTS_REACHED, "Max connection attempts reached."));
             lock.notifyAll();
         }
     }
@@ -574,7 +586,7 @@
                     }
 
                     if (flowContext.encoderErrors == maxEncoderErrors) {
-                        error(flowContext, new IOException());
+                        error(flowContext, new FlowError(CODE_MAX_ENCODER_ERRORS_REACHED, "Max encoder errors reached."));
                     }
 
                     return false;
@@ -588,9 +600,8 @@
                 }
 
                 try {
-                    flowContext.sentEvent(event);
                     flowContext.channelContext().writeAndFlush(buffer);
-                    requestSent0(flowContext);
+                    requestSent0(flowContext, event);
                     buffer.clear();
                     return true;
                 } catch (Exception e) {
--- a/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java	Mon Mar 30 15:27:57 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java	Tue Mar 31 11:55:55 2020 +0200
@@ -10,7 +10,6 @@
 import org.mockito.stubbing.Answer;
 import org.testng.annotations.Test;
 
-import java.io.IOException;
 import java.net.ConnectException;
 
 import static com.passus.net.session.Session.PROTOCOL_TCP;
@@ -18,8 +17,7 @@
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.*;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertNull;
+import static org.testng.AssertJUnit.*;
 
 public class FlowWorkerBaseTest {
 
@@ -82,7 +80,6 @@
         MutableInt count = new MutableInt(0);
         doAnswer((Answer<Void>) invocation ->
                 applyConnect(invocation, (sessionInfo, handler, integer) -> {
-
                     if (count.intValue() == 1) {
                         handler.channelActive(channelContext);
                     } else {
@@ -100,7 +97,8 @@
         assertEquals(1, count.intValue());
         assertEquals(FlowContext.STATE_CONNECTED, flowContext.state);
         assertEquals(0, flowContext.connectionAttempts);
-        assertEquals(false, flowContext.blocked);
+        assertFalse(flowContext.blocked);
+        assertFalse(flowContext.isError());
     }
 
     @Test
@@ -117,8 +115,12 @@
         worker.connect(session);
         FlowContext flowContext = worker.flowContext(session);
 
-        assertEquals(FlowContext.STATE_ERROR, flowContext.state);
         assertEquals(true, flowContext.blocked);
+        assertTrue(flowContext.isError());
+        FlowError error = flowContext.error();
+        assertEquals(FlowError.CODE_CONNECTION_ATTEMPTS_REACHED, error.code());
+        assertEquals("Max connection attempts reached.", error.message());
+        assertEquals(null, error.cause());
     }
 
     @Test
--- a/stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java	Mon Mar 30 15:27:57 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java	Tue Mar 31 11:55:55 2020 +0200
@@ -79,7 +79,7 @@
             });
             flowExecutor.start();
             events.forEach(flowExecutor::handle);
-            flowExecutor.join();
+            flowExecutor.join(2_000);
 
             Dns dnsResponse = res.getValue();
             assertNotNull(dnsResponse);