Mercurial > stress-tester
changeset 1112:35118d7d6acf
FlowContext.STATE_REGISTERED added, FlowContext creation after session registered, bugfixes
author | Devel 2 |
---|---|
date | Thu, 21 May 2020 10:18:41 +0200 |
parents | 7da8b2a6bb2e |
children | 37d6e04592dc |
files | stress-tester/src/main/java/com/passus/st/client/FlowContext.java stress-tester/src/main/java/com/passus/st/client/FlowHandler.java stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java stress-tester/src/main/java/com/passus/st/client/FlowUtils.java stress-tester/src/main/java/com/passus/st/client/FlowWorker.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java |
diffstat | 6 files changed, 64 insertions(+), 53 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Wed May 20 13:50:13 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Thu May 21 10:18:41 2020 +0200 @@ -13,11 +13,12 @@ public class FlowContext { public static final byte STATE_NEW = 0; - public static final byte STATE_CONNECTING = 1 << 1; - public static final byte STATE_CONNECTED = 1 << 2; - public static final byte STATE_DISCONNECTING = 1 << 3; - public static final byte STATE_DISCONNECTED = 1 << 4; - public static final byte STATE_ERROR = 1 << 5; + public static final byte STATE_REGISTERED = 1 << 1; + public static final byte STATE_CONNECTING = 1 << 2; + public static final byte STATE_CONNECTED = 1 << 3; + public static final byte STATE_DISCONNECTING = 1 << 4; + public static final byte STATE_DISCONNECTED = 1 << 5; + public static final byte STATE_ERROR = 1 << 6; public static final int INIT_BUFFER_CAPACITY = 1024;
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandler.java Wed May 20 13:50:13 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandler.java Thu May 21 10:18:41 2020 +0200 @@ -22,6 +22,10 @@ FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext); + default void onRegistered(FlowContext flowContext) { + + } + default void onConnected(FlowContext flowContext) { }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Wed May 20 13:50:13 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Thu May 21 10:18:41 2020 +0200 @@ -146,23 +146,17 @@ public final void errorOccurred(ChannelContext context, Throwable cause) throws Exception { if (logger.isDebugEnabled()) { - logger.debug("Error occurred. ", cause); + logger.debug("Error occurred. " + cause.getMessage(), cause); } FlowContext flowContext = context.getFlowContext(); - //Jezeli nie nastapilo polaczenie flowContext == null - if (flowContext == null) { - flowContext = supervisor.flowContext(context); - } - flowContext.lock(); try { - if (flowContext.state == STATE_CONNECTING) { - if (flowContext.connectionAttempts <= maxConnectionAttempts) { + if (flowContext.state <= STATE_CONNECTING) { + if (flowContext.connectionAttempts < maxConnectionAttempts) { if (logger.isDebugEnabled()) { logger.debug("Connection failed. Reconnection (attempt {}/{}, delay {}ms).", - flowContext.connectionAttempts, maxConnectionAttempts, reconnectDelay, - cause); + flowContext.connectionAttempts, maxConnectionAttempts, reconnectDelay); } if (reconnectDelay > 0) { @@ -176,16 +170,16 @@ connect(flowContext); return; } else { + flowContext.block(); + error(flowContext, new FlowError(CODE_CONNECTION_ATTEMPTS_REACHED, "Max connection attempts reached.")); if (logger.isDebugEnabled()) { - logger.debug("Connection failed. No reconnection.", flowContext.connectionAttempts, maxConnectionAttempts, cause); + logger.debug("Connection failed. No reconnection.", flowContext.connectionAttempts, maxConnectionAttempts); } } } } finally { flowContext.signalAndUnlock(); } - - error(flowContext, new FlowError(CODE_CONNECTION_ATTEMPTS_REACHED, "Max connection attempts reached.")); } protected void error(FlowContext flowContext, Throwable cause) { @@ -202,38 +196,51 @@ } @Override - public void channelActive(ChannelContext context) throws Exception { + public void channelRegistered(ChannelContext context) throws Exception { FlowContext flowContext = supervisor.flowContext(context); - if (flowContext != null) { - if (logger.isDebugEnabled()) { - debug(logger, flowContext, "Channel active (localSocket: {}, remoteSocket: {})", - context.getLocalAddress(), - context.getRemoteAddress()); + flowContext.lock(); + try { + flowContext.state = STATE_REGISTERED; + flowContext.channelContext(context); + context.setFlowContext(flowContext); + flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY)); + + flowContext.flowHandler.onRegistered(flowContext); + } catch (Exception ex) { + error(flowContext, ex); + } finally { + flowContext.signalAndUnlock(); + } + } + + @Override + public void channelActive(ChannelContext context) throws Exception { + FlowContext flowContext = context.getFlowContext(); + if (logger.isDebugEnabled()) { + debug(logger, flowContext, "Channel active (localSocket: {}, remoteSocket: {})", + context.getLocalAddress(), + context.getRemoteAddress()); + } + + flowContext.lock(); + try { + context.setBidirectional(flowContext.isBidirectional()); + flowContext.connectionAttempts = 0; + flowContext.state = STATE_CONNECTED; + + try { + supervisor.onConnected(flowContext); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + debug(logger, flowContext, "Error occurred during onConnected calling.", e); + } } - flowContext.lock(); - try { - context.setBidirectional(flowContext.isBidirectional()); - flowContext.channelContext(context); - context.setFlowContext(flowContext); - flowContext.connectionAttempts = 0; - flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY)); - flowContext.state = STATE_CONNECTED; - - try { - supervisor.onConnected(flowContext); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - debug(logger, flowContext, "Error occurred during onConnected calling.", e); - } - } - - flowContext.flowHandler.onConnected(flowContext); - } catch (Exception ex) { - error(flowContext, ex); - } finally { - flowContext.signalAndUnlock(); - } + flowContext.flowHandler.onConnected(flowContext); + } catch (Exception ex) { + error(flowContext, ex); + } finally { + flowContext.signalAndUnlock(); } } @@ -273,7 +280,6 @@ flowContext.lock(); try { disconnect(flowContext); - //addBlockedSession(session); } finally { flowContext.signalAndUnlock(); }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java Wed May 20 13:50:13 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java Thu May 21 10:18:41 2020 +0200 @@ -85,7 +85,8 @@ } public static boolean mayReconnect(FlowContext flowContext) { - if (flowContext.state == STATE_CONNECTED || !flowContext.sessionEstablishedSeen) { + if (flowContext.state == STATE_CONNECTED + || !flowContext.sessionEstablishedSeen) { return false; }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Wed May 20 13:50:13 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Thu May 21 10:18:41 2020 +0200 @@ -16,9 +16,9 @@ protected final Logger logger = LogManager.getLogger(getClass()); - public static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 3; - public static final int DEFAULT_MAX_ENCODER_ERRORS = 3; - public static final int DEFAULT_MAX_SEND_ERRORS = 3; + public static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 1; + public static final int DEFAULT_MAX_ENCODER_ERRORS = 1; + public static final int DEFAULT_MAX_SEND_ERRORS = 1; public static final int DEFAULT_RECONNECT_DELAY = 1000; public static final float TIMEOUT_FACTOR = 1.75f;
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Wed May 20 13:50:13 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Thu May 21 10:18:41 2020 +0200 @@ -261,7 +261,6 @@ if (event.getType() == SessionStatusEvent.TYPE) { SessionStatusEvent statusEvent = (SessionStatusEvent) sessEvent; processFlowSessionStatusEvent(statusEvent, true); - return; } else if (event.getType() == SessionPayloadEvent.TYPE) { FlowContext flowContext = flowContext(sessEvent);