changeset 1112:35118d7d6acf

FlowContext.STATE_REGISTERED added, FlowContext creation after session registered, bugfixes
author Devel 2
date Thu, 21 May 2020 10:18:41 +0200
parents 7da8b2a6bb2e
children 37d6e04592dc
files stress-tester/src/main/java/com/passus/st/client/FlowContext.java stress-tester/src/main/java/com/passus/st/client/FlowHandler.java stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java stress-tester/src/main/java/com/passus/st/client/FlowUtils.java stress-tester/src/main/java/com/passus/st/client/FlowWorker.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java
diffstat 6 files changed, 64 insertions(+), 53 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Wed May 20 13:50:13 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Thu May 21 10:18:41 2020 +0200
@@ -13,11 +13,12 @@
 public class FlowContext {
 
     public static final byte STATE_NEW = 0;
-    public static final byte STATE_CONNECTING = 1 << 1;
-    public static final byte STATE_CONNECTED = 1 << 2;
-    public static final byte STATE_DISCONNECTING = 1 << 3;
-    public static final byte STATE_DISCONNECTED = 1 << 4;
-    public static final byte STATE_ERROR = 1 << 5;
+    public static final byte STATE_REGISTERED = 1 << 1;
+    public static final byte STATE_CONNECTING = 1 << 2;
+    public static final byte STATE_CONNECTED = 1 << 3;
+    public static final byte STATE_DISCONNECTING = 1 << 4;
+    public static final byte STATE_DISCONNECTED = 1 << 5;
+    public static final byte STATE_ERROR = 1 << 6;
 
     public static final int INIT_BUFFER_CAPACITY = 1024;
 
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandler.java	Wed May 20 13:50:13 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandler.java	Thu May 21 10:18:41 2020 +0200
@@ -22,6 +22,10 @@
 
     FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext);
 
+    default void onRegistered(FlowContext flowContext) {
+
+    }
+
     default void onConnected(FlowContext flowContext) {
 
     }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Wed May 20 13:50:13 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Thu May 21 10:18:41 2020 +0200
@@ -146,23 +146,17 @@
 
     public final void errorOccurred(ChannelContext context, Throwable cause) throws Exception {
         if (logger.isDebugEnabled()) {
-            logger.debug("Error occurred. ", cause);
+            logger.debug("Error occurred. " + cause.getMessage(), cause);
         }
 
         FlowContext flowContext = context.getFlowContext();
-        //Jezeli nie nastapilo polaczenie flowContext == null
-        if (flowContext == null) {
-            flowContext = supervisor.flowContext(context);
-        }
-
         flowContext.lock();
         try {
-            if (flowContext.state == STATE_CONNECTING) {
-                if (flowContext.connectionAttempts <= maxConnectionAttempts) {
+            if (flowContext.state <= STATE_CONNECTING) {
+                if (flowContext.connectionAttempts < maxConnectionAttempts) {
                     if (logger.isDebugEnabled()) {
                         logger.debug("Connection failed. Reconnection (attempt {}/{}, delay {}ms).",
-                                flowContext.connectionAttempts, maxConnectionAttempts, reconnectDelay,
-                                cause);
+                                flowContext.connectionAttempts, maxConnectionAttempts, reconnectDelay);
                     }
 
                     if (reconnectDelay > 0) {
@@ -176,16 +170,16 @@
                     connect(flowContext);
                     return;
                 } else {
+                    flowContext.block();
+                    error(flowContext, new FlowError(CODE_CONNECTION_ATTEMPTS_REACHED, "Max connection attempts reached."));
                     if (logger.isDebugEnabled()) {
-                        logger.debug("Connection failed. No reconnection.", flowContext.connectionAttempts, maxConnectionAttempts, cause);
+                        logger.debug("Connection failed. No reconnection.", flowContext.connectionAttempts, maxConnectionAttempts);
                     }
                 }
             }
         } finally {
             flowContext.signalAndUnlock();
         }
-
-        error(flowContext, new FlowError(CODE_CONNECTION_ATTEMPTS_REACHED, "Max connection attempts reached."));
     }
 
     protected void error(FlowContext flowContext, Throwable cause) {
@@ -202,38 +196,51 @@
     }
 
     @Override
-    public void channelActive(ChannelContext context) throws Exception {
+    public void channelRegistered(ChannelContext context) throws Exception {
         FlowContext flowContext = supervisor.flowContext(context);
-        if (flowContext != null) {
-            if (logger.isDebugEnabled()) {
-                debug(logger, flowContext, "Channel active (localSocket: {}, remoteSocket: {})",
-                        context.getLocalAddress(),
-                        context.getRemoteAddress());
+        flowContext.lock();
+        try {
+            flowContext.state = STATE_REGISTERED;
+            flowContext.channelContext(context);
+            context.setFlowContext(flowContext);
+            flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY));
+
+            flowContext.flowHandler.onRegistered(flowContext);
+        } catch (Exception ex) {
+            error(flowContext, ex);
+        } finally {
+            flowContext.signalAndUnlock();
+        }
+    }
+
+    @Override
+    public void channelActive(ChannelContext context) throws Exception {
+        FlowContext flowContext = context.getFlowContext();
+        if (logger.isDebugEnabled()) {
+            debug(logger, flowContext, "Channel active (localSocket: {}, remoteSocket: {})",
+                    context.getLocalAddress(),
+                    context.getRemoteAddress());
+        }
+
+        flowContext.lock();
+        try {
+            context.setBidirectional(flowContext.isBidirectional());
+            flowContext.connectionAttempts = 0;
+            flowContext.state = STATE_CONNECTED;
+
+            try {
+                supervisor.onConnected(flowContext);
+            } catch (Exception e) {
+                if (logger.isDebugEnabled()) {
+                    debug(logger, flowContext, "Error occurred during onConnected calling.", e);
+                }
             }
 
-            flowContext.lock();
-            try {
-                context.setBidirectional(flowContext.isBidirectional());
-                flowContext.channelContext(context);
-                context.setFlowContext(flowContext);
-                flowContext.connectionAttempts = 0;
-                flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY));
-                flowContext.state = STATE_CONNECTED;
-
-                try {
-                    supervisor.onConnected(flowContext);
-                } catch (Exception e) {
-                    if (logger.isDebugEnabled()) {
-                        debug(logger, flowContext, "Error occurred during onConnected calling.", e);
-                    }
-                }
-
-                flowContext.flowHandler.onConnected(flowContext);
-            } catch (Exception ex) {
-                error(flowContext, ex);
-            } finally {
-                flowContext.signalAndUnlock();
-            }
+            flowContext.flowHandler.onConnected(flowContext);
+        } catch (Exception ex) {
+            error(flowContext, ex);
+        } finally {
+            flowContext.signalAndUnlock();
         }
     }
 
@@ -273,7 +280,6 @@
             flowContext.lock();
             try {
                 disconnect(flowContext);
-                //addBlockedSession(session);
             } finally {
                 flowContext.signalAndUnlock();
             }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java	Wed May 20 13:50:13 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java	Thu May 21 10:18:41 2020 +0200
@@ -85,7 +85,8 @@
     }
 
     public static boolean mayReconnect(FlowContext flowContext) {
-        if (flowContext.state == STATE_CONNECTED || !flowContext.sessionEstablishedSeen) {
+        if (flowContext.state == STATE_CONNECTED
+                || !flowContext.sessionEstablishedSeen) {
             return false;
         }
 
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java	Wed May 20 13:50:13 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java	Thu May 21 10:18:41 2020 +0200
@@ -16,9 +16,9 @@
 
     protected final Logger logger = LogManager.getLogger(getClass());
 
-    public static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 3;
-    public static final int DEFAULT_MAX_ENCODER_ERRORS = 3;
-    public static final int DEFAULT_MAX_SEND_ERRORS = 3;
+    public static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 1;
+    public static final int DEFAULT_MAX_ENCODER_ERRORS = 1;
+    public static final int DEFAULT_MAX_SEND_ERRORS = 1;
     public static final int DEFAULT_RECONNECT_DELAY = 1000;
 
     public static final float TIMEOUT_FACTOR = 1.75f;
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Wed May 20 13:50:13 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Thu May 21 10:18:41 2020 +0200
@@ -261,7 +261,6 @@
             if (event.getType() == SessionStatusEvent.TYPE) {
                 SessionStatusEvent statusEvent = (SessionStatusEvent) sessEvent;
                 processFlowSessionStatusEvent(statusEvent, true);
-
                 return;
             } else if (event.getType() == SessionPayloadEvent.TYPE) {
                 FlowContext flowContext = flowContext(sessEvent);