Mercurial > stress-tester
changeset 1034:51ef66fadb5c
FlowProcessor, FlowProcessorSupervisor
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/AbstractFlowProcessorSupervisorWrapper.java Mon Apr 06 14:13:39 2020 +0200 @@ -0,0 +1,92 @@ +package com.passus.st.client; + +import com.passus.st.emitter.Emitter; +import com.passus.st.filter.FlowFilterChain; + +public abstract class AbstractFlowProcessorSupervisorWrapper implements FlowProcessorSupervisor { + + private final FlowWorker flowWorker; + + public AbstractFlowProcessorSupervisorWrapper(FlowWorker flowWorker) { + this.flowWorker = flowWorker; + } + + public FlowWorker getFlowWorker() { + return flowWorker; + } + + @Override + public Emitter getEmitter() { + return flowWorker.emitter; + } + + @Override + public Timeouts getTimeouts() { + return flowWorker.getTimeouts(); + } + + @Override + public FlowFilterChain getFilterChain() { + return flowWorker.filterChain; + } + + @Override + public int getMaxEncoderErrors() { + return flowWorker.maxEncoderErrors; + } + + @Override + public int getMaxConnectionAttempts() { + return flowWorker.maxConnectionAttempts; + } + + @Override + public int getMaxSendErrors() { + return flowWorker.maxSendErrors; + } + + @Override + public long getReconnectDelay() { + return flowWorker.reconnectDelay; + } + + @Override + public boolean isCollectMetrics() { + return flowWorker.collectMetric; + } + + @Override + public FlowMetric getFlowMetric() { + return flowWorker.metric; + } + + @Override + public void onConnected(FlowContext flowContext) { + + } + + @Override + public void onRequestSent(FlowContext flowContext, SessionPayloadEvent event) { + + } + + @Override + public void onResponseReceived(FlowContext flowContext, Object response) { + + } + + @Override + public void onDisconnecting(FlowContext flowContext) { + + } + + @Override + public void onDisconnected(FlowContext flowContext) { + + } + + @Override + public void onError(FlowContext flowContext) { + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Mon Apr 06 14:13:39 2020 +0200 @@ -0,0 +1,453 @@ +package com.passus.st.client; + +import com.passus.data.ByteBuff; +import com.passus.data.DataDecoder; +import com.passus.data.HeapByteBuff; +import com.passus.filter.Filter; +import com.passus.st.emitter.ChannelContext; +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.filter.FlowFilterChain; +import org.apache.logging.log4j.Logger; + +import static com.passus.st.client.FlowContext.*; +import static com.passus.st.client.FlowError.*; +import static com.passus.st.client.FlowUtils.*; + +public class FlowProcessor implements EmitterHandler { + + private final Logger logger; + + private final Emitter emitter; + + private final Timeouts timeouts; + + private final FlowFilterChain filterChain; + + private final int workerIndex; + + private final FlowProcessorSupervisor supervisor; + + private final boolean collectMetric; + + private final FlowMetric metric; + + private final int maxConnectionAttempts; + + private final long reconnectDelay; + + private final int maxEncoderErrors; + + private final int maxSendErrors; + + private final boolean trace; + + public FlowProcessor(FlowProcessorSupervisor supervisor, Logger logger, int workerIndex) { + this.supervisor = supervisor; + this.emitter = supervisor.getEmitter(); + this.timeouts = supervisor.getTimeouts(); + this.filterChain = supervisor.getFilterChain(); + this.workerIndex = workerIndex; + this.collectMetric = supervisor.isCollectMetrics(); + this.metric = supervisor.getFlowMetric(); + this.maxEncoderErrors = supervisor.getMaxEncoderErrors(); + this.maxSendErrors = supervisor.getMaxSendErrors(); + this.maxConnectionAttempts = supervisor.getMaxConnectionAttempts(); + this.reconnectDelay = supervisor.getReconnectDelay(); + + this.logger = logger; + this.trace = logger.isTraceEnabled(); + } + + public FlowProcessorSupervisor getSupervisor() { + return supervisor; + } + + protected void connect(FlowContext flowContext) { + connect(flowContext, true); + } + + protected void connect(FlowContext flowContext, boolean wait) { + flowContext.lock(); + try { + flowContext.connectionAttempts++; + flowContext.state = STATE_CONNECTING; + emitter.connect(flowContext.session, this, workerIndex); + if (wait) { + waitOpFinished(flowContext, STATE_CONNECTED); + } + } catch (Exception ex) { + error(flowContext, ex); + } finally { + flowContext.signalAndUnlock(); + } + } + + protected void disconnect(FlowContext flowContext) { + disconnect(flowContext, true); + } + + protected void disconnect(FlowContext flowContext, boolean wait) { + if (logger.isDebugEnabled()) { + debug(logger, flowContext, "Disconnect."); + } + + long now = System.currentTimeMillis(); + flowContext.lock(); + try { + if (trace) { + debug(logger, flowContext, "Disconnecting."); + } + + if (flowContext.state == STATE_DISCONNECTING + || flowContext.state == STATE_DISCONNECTED) { + return; + } + + flowContext.state = STATE_DISCONNECTING; + flowContext.timeout = now + timeouts.getDisconnectingTimeout(); + + try { + supervisor.onDisconnecting(flowContext); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + debug(logger, flowContext, "Error occurred during onDisconnecting calling.", e); + } + } + + if (flowContext.channelContext() != null) { + try { + flowContext.channelContext().close(); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } + } + + if (wait) { + waitOpFinished(flowContext, STATE_DISCONNECTED, timeouts.getDisconnectingTimeout()); + } + } catch (InterruptedException e) { + error(flowContext, e); + } finally { + flowContext.signalAndUnlock(); + } + } + + public final void errorOccurred(ChannelContext context, Throwable cause) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("Error occurred. ", cause); + } + + FlowContext flowContext = context.getFlowContext(); + //Jezeli nie nastapilo polaczenie flowContext == null + if (flowContext == null) { + flowContext = supervisor.flowContext(context); + } + + flowContext.lock(); + try { + if (flowContext.state == STATE_CONNECTING) { + if (flowContext.connectionAttempts <= maxConnectionAttempts) { + if (logger.isDebugEnabled()) { + logger.debug("Connection failed. Reconnection (attempt {}/{}, delay {}ms).", + flowContext.connectionAttempts, maxConnectionAttempts, reconnectDelay, + cause); + } + + if (reconnectDelay > 0) { + try { + Thread.sleep(reconnectDelay); + } catch (InterruptedException ignore) { + + } + } + + connect(flowContext); + return; + } else { + if (logger.isDebugEnabled()) { + logger.debug("Connection failed. No reconnection.", flowContext.connectionAttempts, maxConnectionAttempts, cause); + } + } + } + } finally { + flowContext.signalAndUnlock(); + } + + error(flowContext, new FlowError(CODE_CONNECTION_ATTEMPTS_REACHED, "Max connection attempts reached.")); + } + + protected void error(FlowContext flowContext, Throwable cause) { + error(flowContext, FlowError.interpret(cause)); + } + + protected void error(FlowContext flowContext, FlowError error) { + if (flowContext.state >= STATE_CONNECTED && flowContext.state < STATE_DISCONNECTING) { + disconnect(flowContext, false); + } + + flowContext.error(error); + supervisor.onError(flowContext); + } + + @Override + public void channelActive(ChannelContext context) throws Exception { + FlowContext flowContext = supervisor.flowContext(context); + if (flowContext != null) { + if (logger.isDebugEnabled()) { + debug(logger, flowContext, "Channel active (localSocket: {}, remoteSocket: {})", + context.getLocalAddress(), + context.getRemoteAddress()); + } + + flowContext.lock(); + try { + context.setBidirectional(flowContext.isBidirectional()); + flowContext.channelContext(context); + context.setFlowContext(flowContext); + flowContext.connectionAttempts = 0; + flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY)); + flowContext.state = STATE_CONNECTED; + + try { + supervisor.onConnected(flowContext); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + debug(logger, flowContext, "Error occurred during onConnected calling.", e); + } + } + } catch (Exception ex) { + error(flowContext, ex); + } finally { + flowContext.signalAndUnlock(); + } + } + } + + @Override + public void channelInactive(ChannelContext context) throws Exception { + FlowContext flowContext = context.getFlowContext(); + if (logger.isDebugEnabled()) { + debug(logger, flowContext, "Channel inactive."); + } + + flowContext.lock(); + try { + flowContext.state = STATE_DISCONNECTED; + try { + supervisor.onDisconnected(flowContext); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + debug(logger, flowContext, "Error occurred during onDisconnected calling.", e); + } + } + } finally { + flowContext.clear(); + flowContext.signal(); + flowContext.unlock(); + } + } + + @Override + public void sessionInvalidated(SessionInfo session) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("Session {} invalidated.", session); + } + + FlowContext flowContext = supervisor.flowContext(session); + if (flowContext != null) { + flowContext.lock(); + try { + disconnect(flowContext); + //addBlockedSession(session); + } finally { + flowContext.signalAndUnlock(); + } + } + } + + private void responseReceived0(FlowContext flowContext, Object response) { + supervisor.onResponseReceived(flowContext, response); + flowContext.sentEvent(null); + flowContext.receivedStartTimestamp(-1); + } + + @Override + public void dataReceived(ChannelContext context, ByteBuff data) throws Exception { + FlowContext flowContext = context.getFlowContext(); + logger.debug("dataReceived"); + flowContext.lock(); + try { + try { + logger.debug("dataReceived-after lock"); + FlowHandler client = flowContext.client(); + FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext); + decoder.decode(data, flowContext); + + long now = System.currentTimeMillis(); + if (flowContext.receivedStartTimestamp() == -1) { + flowContext.receivedStartTimestamp(now); + } + + if (decoder.state() == DataDecoder.STATE_ERROR) { + if (collectMetric) { + synchronized (metric) { + metric.incErrorNum(); + } + } + + if (logger.isDebugEnabled()) { + debug(logger, flowContext, "Decoder error. " + decoder.getLastError()); + } + + decoder.clear(flowContext); + responseReceived0(flowContext, null); + } else if (decoder.state() == DataDecoder.STATE_FINISHED) { + if (collectMetric) { + synchronized (metric) { + metric.incResponsesNum(); + metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp()); + } + } + + Object resp = decoder.getResult(); + Object req = null; + if (flowContext.sentEvent() != null) { + req = flowContext.sentEvent().getRequest(); + } + + decoder.clear(flowContext); + if (filterChain.filterInbound(req, resp, flowContext) != Filter.DENY) { + responseReceived0(flowContext, resp); + } + } + } catch (Exception e) { + if (collectMetric) { + synchronized (metric) { + metric.incErrorNum(); + } + } + + if (logger.isDebugEnabled()) { + debug(logger, flowContext, e.getMessage(), e); + } + + error(flowContext, FlowError.unknownError()); + } + } finally { + flowContext.signalAndUnlock(); + } + } + + @Override + public void dataWriteStart(ChannelContext context) { + FlowContext flowContext = context.getFlowContext(); + flowContext.lock(); + try { + if (flowContext.sentEvent() != null) { + flowContext.writeStartTime = System.currentTimeMillis(); + flowContext.writeEndTime = -1; + flowContext.client.onDataWriteStart(flowContext); + } + } finally { + flowContext.signalAndUnlock(); + } + } + + @Override + public void dataWritten(ChannelContext context) throws Exception { + FlowContext flowContext = context.getFlowContext(); + flowContext.lock(); + try { + if (flowContext.isEventSent()) { + long now = System.currentTimeMillis(); + if (collectMetric) { + synchronized (metric) { + metric.addRequestSendingTime(now - flowContext.sendStartTimestamp()); + } + } + + flowContext.writeEndTime = now; + flowContext.client.onDataWriteEnd(flowContext); + } + } finally { + flowContext.signalAndUnlock(); + } + } + + private void requestSent0(FlowContext flowContext, SessionPayloadEvent event) { + flowContext.sentEvent = event; + supervisor.onRequestSent(flowContext, event); + } + + protected void send(FlowContext flowContext, SessionPayloadEvent event, boolean wait) { + flowContext.lock(); + try { + Object req = event.getRequest(); + if (req != null) { + if (filterChain.filterOutbound(req, event.getResponse(), flowContext) == Filter.DENY) { + return; + } + + ByteBuff buffer; + FlowHandler client = flowContext.client(); + FlowHandlerDataEncoder encoder = client.getRequestEncoder(flowContext); + buffer = flowContext.buffer(); + try { + encoder.encode(req, flowContext, buffer); + } catch (Exception e) { + flowContext.encoderErrors++; + if (logger.isDebugEnabled()) { + debug(logger, flowContext, e.getMessage(), e); + } + + if (flowContext.encoderErrors == maxEncoderErrors) { + error(flowContext, new FlowError(CODE_MAX_ENCODER_ERRORS_REACHED, "Max encoder errors reached.")); + } + + return; + } + + if (collectMetric) { + synchronized (metric) { + metric.incRequestsNum(); + metric.addRequestSize(flowContext.buffer().readableBytes()); + } + } + + try { + flowContext.sentEvent = event; + flowContext.writeStartTime = -1; + flowContext.writeEndTime = -1; + flowContext.channelContext().writeAndFlush(buffer); + requestSent0(flowContext, event); + buffer.clear(); + + if (wait) { + if (flowContext.isBidirectional()) { + waitForResponse(flowContext); + } else { + waitForWriteEnd(flowContext); + } + } + } catch (Exception e) { + flowContext.sendErrors++; + if (logger.isDebugEnabled()) { + debug(logger, flowContext, e.getMessage(), e); + } + + if (flowContext.sendErrors == maxSendErrors) { + error(flowContext, new FlowError(CODE_MAX_SEND_ERRORS_REACHED, "Max send errors reached.")); + } + } + } + } catch (Exception e) { + error(flowContext, e); + } finally { + flowContext.unlock(); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessorSupervisor.java Mon Apr 06 14:13:39 2020 +0200 @@ -0,0 +1,46 @@ +package com.passus.st.client; + +import com.passus.st.emitter.ChannelContext; +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.filter.FlowFilterChain; + +public interface FlowProcessorSupervisor { + + Emitter getEmitter(); + + Timeouts getTimeouts(); + + FlowFilterChain getFilterChain(); + + int getMaxEncoderErrors(); + + int getMaxConnectionAttempts(); + + int getMaxSendErrors(); + + long getReconnectDelay(); + + FlowContext flowContext(SessionInfo session); + + default FlowContext flowContext(ChannelContext context) { + return flowContext(context.getSessionInfo()); + } + + boolean isCollectMetrics(); + + FlowMetric getFlowMetric(); + + void onConnected(FlowContext flowContext); + + void onRequestSent(FlowContext flowContext, SessionPayloadEvent event); + + void onResponseReceived(FlowContext flowContext, Object response); + + void onDisconnecting(FlowContext flowContext); + + void onDisconnected(FlowContext flowContext); + + void onError(FlowContext flowContext); + +}
--- a/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java Fri Apr 03 15:08:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java Mon Apr 06 14:13:39 2020 +0200 @@ -4,17 +4,105 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.TimeUnit; -import static com.passus.st.client.FlowContext.STATE_DISCONNECTED; +import static com.passus.st.client.FlowContext.STATE_CONNECTED; +import static com.passus.st.client.FlowError.*; public class FlowUtils { private FlowUtils() { } + public static Event eventInstanceForWorker(Event event, int workerIndex) { + if (event instanceof SessionEvent) { + Event newEvent = ((SessionEvent) event).instanceForWorker(workerIndex); + newEvent.setTimestamp(event.getTimestamp()); + return newEvent; + } else { + return event; + } + } + + public static boolean waitForWriteEnd(FlowContext flowContext) throws InterruptedException { + return waitForWriteEnd(flowContext, Timeouts.DEFAULT_TIMEOUT); + } + + public static boolean waitForWriteEnd(FlowContext flowContext, long timeout) throws InterruptedException { + long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeout); + while (flowContext.writeEndTime == -1 && !flowContext.isError()) { + if (timeNanos <= 0) { + return false; + } + + timeNanos = flowContext.lockCond.awaitNanos(timeNanos); + } + + return true; + } + + public static boolean waitForResponse(FlowContext flowContext) throws InterruptedException { + return waitForResponse(flowContext, Timeouts.DEFAULT_TIMEOUT); + } + + public static boolean waitForResponse(FlowContext flowContext, long timeout) throws InterruptedException { + long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeout); + while (flowContext.sentEvent != null && !flowContext.isError()) { + if (timeNanos <= 0) { + return false; + } + + timeNanos = flowContext.lockCond.awaitNanos(timeNanos); + } + + return true; + } + + public static boolean waitOpFinished(FlowContext flowContext, byte neededFlags) throws InterruptedException { + return waitOpFinished(flowContext, neededFlags, Timeouts.DEFAULT_TIMEOUT); + } + + public static boolean waitOpFinished(FlowContext flowContext, byte stateNeeded, long timeout) throws InterruptedException { + long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeout); + while ((flowContext.state & stateNeeded) != stateNeeded && !flowContext.isError()) { + if (timeNanos <= 0) { + return false; + } + + timeNanos = flowContext.lockCond.awaitNanos(timeNanos); + } + + return true; + } + + public static boolean mayReconnect(FlowContext flowContext) { + if (flowContext.state == STATE_CONNECTED || !flowContext.sessionEstablishedSeen) { + return false; + } + + return flowContext.error != null + && (flowContext.error.code() == CODE_CONNECTION_RESET_BY_PEER + || flowContext.error.code() == CODE_CONNECTION_CLOSED_UNEXPECTEDLY + || flowContext.error.code() == CODE_IDE_TIMEOUT); + } + + public static boolean checkMayConnectIfPartial(FlowContext flowContext, boolean connectPartialSession) { + return connectPartialSession + && !flowContext.sessionEstablishedSeen + && !flowContext.isError(); + } + + public static void sleepSilently(long millis) { + if (millis <= 0) { + return; + } + + try { + Thread.sleep(millis); + } catch (InterruptedException ignore) { + } + } + public static void trace(Logger log, FlowContext flowContext, String message, Object... args) { log(log, flowContext, Level.TRACE, message, args); } @@ -32,7 +120,7 @@ log.log(level, message, cause); } - public static final void debug(Logger log, FlowContext flowContext, String message, Object... args) { + public static void debug(Logger log, FlowContext flowContext, String message, Object... args) { log(log, flowContext, Level.DEBUG, message, args); }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Fri Apr 03 15:08:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Mon Apr 06 14:13:39 2020 +0200 @@ -3,13 +3,12 @@ import com.passus.commons.Assert; import com.passus.commons.time.TimeAware; import com.passus.commons.time.TimeGenerator; -import com.passus.st.filter.FlowFilterChain; import com.passus.st.emitter.Emitter; import com.passus.st.emitter.EmitterHandler; import com.passus.st.emitter.SessionInfo; +import com.passus.st.filter.FlowFilterChain; import com.passus.st.metric.MetricSource; import com.passus.st.metric.MetricsContainer; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -17,6 +16,11 @@ protected final Logger logger = LogManager.getLogger(getClass()); + public static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 3; + public static final int DEFAULT_MAX_ENCODER_ERRORS = 3; + public static final int DEFAULT_MAX_SEND_ERRORS = 3; + public static final int DEFAULT_RECONNECT_DELAY = 1000; + protected final int index; private ClientListener listener; @@ -35,6 +39,16 @@ protected TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); + protected int maxConnectionAttempts = DEFAULT_MAX_CONNECTION_ATTEMPTS; + + protected int maxEncoderErrors = DEFAULT_MAX_ENCODER_ERRORS; + + protected int maxSendErrors = DEFAULT_MAX_SEND_ERRORS; + + protected long reconnectDelay = DEFAULT_RECONNECT_DELAY; + + protected Timeouts timeouts = new Timeouts(); + protected final boolean trace; public FlowWorker(Emitter emitter, String name, int index) { @@ -45,6 +59,10 @@ this.trace = logger.isTraceEnabled(); } + public Emitter getEmitter() { + return emitter; + } + public boolean isConnectPartialSession() { return connectPartialSession; } @@ -53,10 +71,36 @@ this.connectPartialSession = connectPartialSession; } - public abstract boolean isWorking(); + public int getMaxConnectionAttempts() { + return maxConnectionAttempts; + } - public int index() { - return index; + public void setMaxConnectionAttempts(int maxConnectionAttempts) { + this.maxConnectionAttempts = maxConnectionAttempts; + } + + public long getReconnectDelay() { + return reconnectDelay; + } + + public void setReconnectDelay(long reconnectDelay) { + this.reconnectDelay = reconnectDelay; + } + + public int getMaxEncoderErrors() { + return maxEncoderErrors; + } + + public void setMaxEncoderErrors(int maxEncoderErrors) { + this.maxEncoderErrors = maxEncoderErrors; + } + + public int getMaxSendErrors() { + return maxSendErrors; + } + + public void setMaxSendErrors(int maxSendErrors) { + this.maxSendErrors = maxSendErrors; } public FlowFilterChain filterChain() { @@ -68,6 +112,16 @@ this.filterChain = filterChain; } + public Timeouts getTimeouts() { + return timeouts; + } + + public abstract boolean isWorking(); + + public int index() { + return index; + } + public FlowHandlerFactory getClientFactory() { return clientFactory; } @@ -91,7 +145,7 @@ this.listener = listener; } - @Override + @Override() public boolean isCollectMetrics() { return metric != null; } @@ -138,43 +192,4 @@ public abstract void handle(Event event); - protected final void trace(FlowContext flowContext, String message, Object... args) { - log(flowContext, Level.TRACE, message, args); - } - - protected final void debug(FlowContext flowContext, String message, Throwable cause) { - log(flowContext, Level.DEBUG, message, cause); - } - - protected final void error(FlowContext flowContext, String message, Throwable cause) { - log(flowContext, Level.ERROR, message, cause); - } - - protected final void log(FlowContext flowContext, Level level, String message, Throwable cause) { - message = String.format("%s [%s]", message, flowContext.sessionInfo()); - logger.log(level, message, cause); - } - - protected final void debug(FlowContext flowContext, String message, Object... args) { - log(flowContext, Level.DEBUG, message, args); - } - - protected final void log(FlowContext flowContext, Level level, String message, Object... args) { - if (args.length > 0) { - message = String.format(message, args); - } - - SessionInfo session = flowContext.sessionInfo(); - if (args.length == 0) { - logger.log(level, message + " [{}]", session); - } else { - Object[] logArgs = new Object[args.length + 1]; - for (int i = 0; i < args.length; i++) { - logArgs[i] = args[i]; - } - - logArgs[logArgs.length - 1] = session; - logger.log(level, message + " [{}]", logArgs); - } - } }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Fri Apr 03 15:08:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Mon Apr 06 14:13:39 2020 +0200 @@ -14,11 +14,12 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import static com.passus.st.client.FlowContext.*; import static com.passus.st.client.FlowError.*; +import static com.passus.st.client.FlowUtils.*; +@Deprecated public abstract class FlowWorkerBase extends FlowWorker { public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f; @@ -205,7 +206,7 @@ } protected void connect(FlowContext flowContext, boolean wait) { - flowContext.lock.lock(); + flowContext.lock(); try { flowContext.connectionAttempts++; flowContext.state = STATE_CONNECTING; @@ -216,7 +217,7 @@ } catch (Exception ex) { error(flowContext, ex); } finally { - flowContext.lock.unlock(); + flowContext.signalAndUnlock(); } } @@ -228,7 +229,7 @@ disconnect(flowContext); } catch (Exception e) { if (logger.isDebugEnabled()) { - debug(flowContext, e.getMessage(), e); + debug(logger, flowContext, e.getMessage(), e); } } } @@ -259,20 +260,16 @@ disconnect(flowContext, true); } - protected void disconnect(FlowContext flowContext, boolean removeFlow) { - disconnect(flowContext, removeFlow, true); - } - - protected void disconnect(FlowContext flowContext, boolean removeFlow, boolean wait) { + protected void disconnect(FlowContext flowContext, boolean wait) { if (logger.isDebugEnabled()) { - debug(flowContext, "Disconnect."); + debug(logger, flowContext, "Disconnect."); } long now = timeGenerator.currentTimeMillis(); flowContext.lock(); try { if (trace) { - debug(flowContext, "Disconnecting."); + debug(logger, flowContext, "Disconnecting."); } if (flowContext.state == STATE_DISCONNECTING @@ -287,7 +284,7 @@ onDisconnecting(flowContext); } catch (Exception e) { if (logger.isDebugEnabled()) { - debug(flowContext, "Error occurred during onDisconnecting calling.", e); + debug(logger, flowContext, "Error occurred during onDisconnecting calling.", e); } } @@ -413,7 +410,7 @@ FlowContext flowContext = flowContext(context); if (flowContext != null) { if (logger.isDebugEnabled()) { - debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})", + debug(logger, flowContext, "Channel active (localSocket: {}, remoteSocket: {})", context.getLocalAddress(), context.getRemoteAddress()); } @@ -431,23 +428,22 @@ onConnected(flowContext); } catch (Exception e) { if (logger.isDebugEnabled()) { - debug(flowContext, "Error occurred during onConnected calling.", e); + debug(logger, flowContext, "Error occurred during onConnected calling.", e); } } } catch (Exception ex) { error(flowContext, ex); } finally { - flowContext.signal(); - flowContext.unlock(); + flowContext.signalAndUnlock(); } } } @Override public void channelInactive(ChannelContext context) throws Exception { - FlowContext flowContext = (FlowContext) context.getFlowContext(); + FlowContext flowContext = context.getFlowContext(); if (logger.isDebugEnabled()) { - debug(flowContext, "Channel inactive."); + debug(logger, flowContext, "Channel inactive."); } flowContext.lock(); @@ -457,7 +453,7 @@ onDisconnected(flowContext); } catch (Exception e) { if (logger.isDebugEnabled()) { - debug(flowContext, "Error occurred during onDisconnected calling.", e); + debug(logger, flowContext, "Error occurred during onDisconnected calling.", e); } } @@ -489,7 +485,7 @@ @Override public void dataReceived(ChannelContext context, ByteBuff data) throws Exception { - FlowContext flowContext = (FlowContext) context.getFlowContext(); + FlowContext flowContext = context.getFlowContext(); logger.debug("dataReceived"); flowContext.lock(); try { @@ -512,7 +508,7 @@ } if (logger.isDebugEnabled()) { - debug(flowContext, "Decoder error. " + decoder.getLastError()); + debug(logger, flowContext, "Decoder error. " + decoder.getLastError()); } decoder.clear(flowContext); @@ -535,7 +531,7 @@ try { fireResponseReceived(req, resp, flowContext); } catch (Exception e) { - error(flowContext, e.getMessage(), e); + //error(flowContext, e.getMessage(), e); } } @@ -550,7 +546,7 @@ } if (logger.isDebugEnabled()) { - debug(flowContext, e.getMessage(), e); + debug(logger, flowContext, e.getMessage(), e); } error(flowContext, FlowError.unknownError()); @@ -602,13 +598,13 @@ logger.debug("Error occurred. ", cause); } - FlowContext flowContext = (FlowContext) context.getFlowContext(); + FlowContext flowContext = context.getFlowContext(); //Jezeli nie nastapilo polaczenie flowContext == null if (flowContext == null) { flowContext = flowContext(context); } - flowContext.lock.lock(); + flowContext.lock(); try { if (flowContext.state == STATE_CONNECTING) { if (flowContext.connectionAttempts <= maxConnectionAttempts) { @@ -635,7 +631,7 @@ } } } finally { - flowContext.lock.unlock(); + flowContext.signalAndUnlock(); } error(flowContext, new FlowError(CODE_CONNECTION_ATTEMPTS_REACHED, "Max connection attempts reached.")); @@ -659,7 +655,7 @@ } catch (Exception e) { flowContext.encoderErrors++; if (logger.isDebugEnabled()) { - debug(flowContext, e.getMessage(), e); + debug(logger, flowContext, e.getMessage(), e); } if (flowContext.encoderErrors == maxEncoderErrors) { @@ -694,7 +690,7 @@ } catch (Exception e) { flowContext.sendErrors++; if (logger.isDebugEnabled()) { - debug(flowContext, e.getMessage(), e); + debug(logger, flowContext, e.getMessage(), e); } if (flowContext.sendErrors == maxSendErrors) { @@ -709,58 +705,6 @@ } } - protected boolean waitForWriteEnd(FlowContext flowContext) throws InterruptedException { - return waitForWriteEnd(flowContext, timeouts.getDefaultTimeout()); - } - - protected boolean waitForWriteEnd(FlowContext flowContext, long timeout) throws InterruptedException { - long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeout); - while (flowContext.writeEndTime == -1 && !flowContext.isError()) { - if (timeNanos <= 0) { - return false; - } - - timeNanos = flowContext.lockCond.awaitNanos(timeNanos); - } - - return true; - } - - protected boolean waitForResponse(FlowContext flowContext) throws InterruptedException { - return waitForResponse(flowContext, timeouts.getDefaultTimeout()); - } - - protected boolean waitForResponse(FlowContext flowContext, long timeout) throws InterruptedException { - logger.debug("waitForResponse"); - long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeout); - while (flowContext.sentEvent != null && !flowContext.isError()) { - if (timeNanos <= 0) { - return false; - } - - timeNanos = flowContext.lockCond.awaitNanos(timeNanos); - } - - return true; - } - - protected boolean waitOpFinished(FlowContext flowContext, byte neededFlags) throws InterruptedException { - return waitOpFinished(flowContext, neededFlags, timeouts.getDefaultTimeout()); - } - - protected boolean waitOpFinished(FlowContext flowContext, byte stateNeeded, long timeout) throws InterruptedException { - long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeout); - while ((flowContext.state & stateNeeded) != stateNeeded && !flowContext.isError()) { - if (timeNanos <= 0) { - return false; - } - - timeNanos = flowContext.lockCond.awaitNanos(timeNanos); - } - - return true; - } - protected void processFlowSessionStatusEvent(SessionStatusEvent statusEvent, boolean wait) { FlowContext flowContext = null; try { @@ -778,22 +722,7 @@ } } - protected boolean mayReconnect(FlowContext flowContext) { - if (flowContext.state == STATE_CONNECTED || !flowContext.sessionEstablishedSeen) { - return false; - } - return flowContext.error != null - && (flowContext.error.code() == CODE_CONNECTION_RESET_BY_PEER - || flowContext.error.code() == CODE_CONNECTION_CLOSED_UNEXPECTEDLY - || flowContext.error.code() == CODE_IDE_TIMEOUT); - } - - protected boolean checkMayConnectIfPartial(FlowContext flowContext) { - return connectPartialSession - && !flowContext.sessionEstablishedSeen - && !flowContext.isError(); - } protected void processTimeouts() { try { @@ -805,7 +734,7 @@ for (FlowContext flowContext : sessions.values()) { if (flowContext.timeouted(now)) { if (logger.isDebugEnabled()) { - debug(flowContext, "Flow for session '{}' timed out (state '{}').", + debug(logger, flowContext, "Flow for session '{}' timed out (state '{}').", flowContext.sessionInfo(), stateToString(flowContext.state())); }
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Fri Apr 03 15:08:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Mon Apr 06 14:13:39 2020 +0200 @@ -11,15 +11,20 @@ import java.util.Deque; import java.util.LinkedList; import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Semaphore; -@Plugin(name = ParallelFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER) -public class ParallelFlowWorker extends FlowWorkerBase { +import static com.passus.st.client.FlowContext.*; +import static com.passus.st.client.FlowUtils.waitOpFinished; - public static final String TYPE = "parallel"; +@Plugin(name = ParallelFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER) +public class ParallelFlowWorker { - public static final int DEFAULT_MAX_SENT_REQUESTS = 10; + public static final String TYPE = "parallel"; + + /*public static final int DEFAULT_MAX_SENT_REQUESTS = 10; private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>(); @@ -84,7 +89,7 @@ } private void waitCloseAllConnections() { - /*closeAllConnections = true; + *//*closeAllConnections = true; synchronized (lock) { while (!flowIndex.isEmpty()) { try { @@ -94,11 +99,11 @@ } } - closeAllConnections = false;*/ + closeAllConnections = false;*//* throw new RuntimeException("Not implemented."); } -/* @Override +*//* @Override protected boolean send(FlowContext flowContext, SessionPayloadEvent event, boolean wait) { //Sprawdzamy, czy polaczen nie jest za duzo. Jezeli jest, to zamykamy //najmniej uzywane. @@ -124,7 +129,7 @@ return super.send(flowContext, event); - }*/ + }*//* private boolean canSend(FlowContext flowContext) { return flowContext.state() == FlowContext.STATE_CONNECTED && !flowContext.isEventSent(); @@ -133,11 +138,11 @@ @Override protected void flowStateChanged(FlowContext flowContext, int oldState) { LocalFlowContext localFlowContext = (LocalFlowContext) flowContext; - /*if (oldState == FlowContext.STATE_REQ_SENT) { + *//*if (oldState == FlowContext.STATE_REQ_SENT) { if (semaphore.availablePermits() <= maxSentRequests) { semaphore.release(); } - }*/ + }*//* if (closeAllConnections) { if (localFlowContext.state() < FlowContext.STATE_DISCONNECTING @@ -284,7 +289,102 @@ } } - protected static class LocalFlowContext extends FlowContext { + protected class FlowThread extends Thread { + + private final FlowContext flowContext; + + private final Timeouts timeouts; + + private final BlockingQueue<Event> events; + + public FlowThread(FlowContext flowContext, Timeouts timeouts, int queueSize) { + this.flowContext = flowContext; + this.timeouts = timeouts; + this.events = new ArrayBlockingQueue<>(queueSize); + } + + protected void connect(FlowContext flowContext, boolean wait) { + flowContext.lock(); + try { + flowContext.connectionAttempts++; + flowContext.state = STATE_CONNECTING; + emitter.connect(flowContext.session, this, index); + if (wait) { + waitOpFinished(flowContext, STATE_CONNECTED); + } + } catch (Exception ex) { + error(flowContext, ex); + } finally { + flowContext.signalAndUnlock(); + } + } + + protected void disconnect(FlowContext flowContext) { + disconnect(flowContext, true); + } + + protected void disconnect(FlowContext flowContext, boolean wait) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Disconnect."); + } + + long now = timeGenerator.currentTimeMillis(); + flowContext.lock(); + try { + if (trace) { + debug(flowContext, "Disconnecting."); + } + + if (flowContext.state == STATE_DISCONNECTING + || flowContext.state == STATE_DISCONNECTED) { + return; + } + + flowContext.state = STATE_DISCONNECTING; + flowContext.timeout = now + timeouts.getDisconnectingTimeout(); + + try { + onDisconnecting(flowContext); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Error occurred during onDisconnecting calling.", e); + } + } + + if (flowContext.channelContext() != null) { + try { + flowContext.channelContext().close(); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } + } + + if (wait) { + waitOpFinished(flowContext, STATE_DISCONNECTED, timeouts.getDisconnectingTimeout()); + } + } catch (InterruptedException e) { + error(flowContext, e); + } finally { + flowContext.signalAndUnlock(); + } + } + + protected void error(FlowContext flowContext, Throwable cause) { + error(flowContext, FlowError.interpret(cause)); + } + + protected void error(FlowContext flowContext, FlowError error) { + if (flowContext.state >= STATE_CONNECTED && flowContext.state < STATE_DISCONNECTING) { + disconnect(flowContext, false); + } + + flowContext.error(error); + } + } + + protected class LocalFlowContext extends FlowContext { private final Queue<Event> eventsQueue; @@ -293,6 +393,6 @@ eventsQueue = new LinkedList<>(); } - } + }*/ }
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Fri Apr 03 15:08:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Mon Apr 06 14:13:39 2020 +0200 @@ -2,30 +2,50 @@ import com.passus.commons.Assert; import com.passus.commons.annotations.Plugin; +import com.passus.st.emitter.ChannelContext; import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.SessionInfo; import com.passus.st.plugin.PluginConstants; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; import static com.passus.st.client.FlowContext.STATE_CONNECTED; +import static com.passus.st.client.FlowContext.STATE_DISCONNECTED; +import static com.passus.st.client.FlowUtils.*; @Plugin(name = SynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER) -public class SynchFlowWorker extends FlowWorkerBase { +public class SynchFlowWorker extends FlowWorker { protected final Logger logger = LogManager.getLogger(getClass()); + public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f; + public static final String TYPE = "synch"; + private volatile boolean working = false; + + protected final Map<SessionInfo, FlowContext> sessions = new ConcurrentHashMap<>(); + private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>(); private long eventsQueueWaitTime = 100; private int loop = 0; + private FlowProcessor flowProcessor; + + private float sleepFactor = SLEEP_FACTOR_NO_SLEEP; + + private long lastEventTimestamp = -1; + public SynchFlowWorker(Emitter emitter, String name, int index) { super(emitter, name, index); + SynchWrapper supervisor = new SynchWrapper(this); + flowProcessor = new FlowProcessor(supervisor, logger, index); } @Override @@ -42,9 +62,124 @@ this.eventsQueueWaitTime = eventsQueueWaitTime; } + protected FlowContext createFlowContext(SessionInfo session) { + FlowContext flowContext = new FlowContext(session); + flowContext.createLock(); + return flowContext; + } + + @Override + public int activeConnections() { + int count = 0; + for (FlowContext flowContext : sessions.values()) { + flowContext.lock(); + try { + if (flowContext.state() != STATE_DISCONNECTED) { + count++; + } + } finally { + flowContext.unlock(); + } + } + + return count; + } + + protected FlowContext register(SessionEvent sessionEvent) { + return register(sessionEvent.getSessionInfo()); + } + + protected FlowContext register(SessionInfo session) { + if (sessions.containsKey(session)) { + logger.warn("Unable to register session '" + session + "'. Session already registered."); + return null; + } + + FlowContext flowContext = createFlowContext(session); + //TODO Malo optymalne + FlowHandler client = clientFactory.create(session.getProtocolId()); + client.init(flowContext); + flowContext.client(client); + sessions.put(session, flowContext); + return flowContext; + } + + protected FlowContext flowContext(SessionEvent event) { + return flowContext(event.getSessionInfo()); + } + + protected FlowContext flowContext(ChannelContext context) { + return flowContext(context.getSessionInfo()); + } + + protected FlowContext flowContext(SessionInfo session) { + FlowContext context = sessions.get(session); + if (context == null) { + if (logger.isDebugEnabled()) { + logger.debug("Context for session '" + session + "' not found."); + } + } + + return context; + } + + protected FlowContext registerAndConnect(SessionInfo session, boolean wait) { + FlowContext flowContext = flowContext(session); + if (flowContext != null) { + if (flowContext.blocked) { + return flowContext; + } + + throw new RuntimeException("Not implemented yet."); + } else { + flowContext = register(session); + } + + flowProcessor.connect(flowContext, wait); + return flowContext; + } + + protected void disconnectAllConnections(boolean wait) { + for (FlowContext flowContext : sessions.values()) { + flowProcessor.disconnect(flowContext, wait); + } + } + + @Override + public void disconnect(SessionInfo session) { + try { + FlowContext flowContext = flowContext(session); + if (flowContext != null) { + flowProcessor.disconnect(flowContext, true); + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } + } + + @Override + public void disconnect() { + for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) { + FlowContext flowContext = entry.getValue(); + try { + flowProcessor.disconnect(flowContext, true); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + debug(logger, flowContext, e.getMessage(), e); + } + } + } + + eventsQueue.clear(); + sessions.clear(); + working = false; + } + @Override public void handle(Event event) { - Event newEvent = eventInstanceForWorker(event); + Event newEvent = eventInstanceForWorker(event, index); try { eventsQueue.put(newEvent); } catch (Exception e) { @@ -52,6 +187,42 @@ } } + protected void errorInternal(FlowContext flowContext, Throwable cause) { + if (flowContext == null) { + logger.error("Internal error.", cause); + } else { + logger.error("Flow {} internal error.", flowContext, cause); + flowProcessor.error(flowContext, FlowError.internalError(cause)); + } + } + + protected void processFlowSessionStatusEvent(SessionStatusEvent statusEvent, boolean wait) { + FlowContext flowContext = null; + try { + if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { + flowContext = registerAndConnect(statusEvent.getSessionInfo(), wait); + flowContext.sessionEstablishedSeen = true; + } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { + flowContext = flowContext(statusEvent); + if (flowContext != null) { + flowProcessor.disconnect(flowContext, wait); + } + } + } catch (Exception e) { + errorInternal(flowContext, e); + } + } + + protected void sleep(Event event) { + if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) { + if (lastEventTimestamp != -1) { + long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor); + sleepSilently(timeToSleep); + } + lastEventTimestamp = event.getTimestamp(); + } + } + /** * Returns true if next event should be processed immediately. * @@ -65,9 +236,9 @@ if (event instanceof SessionEvent) { SessionEvent sessEvent = (SessionEvent) event; - if (isBlockedSession(sessEvent.getSessionInfo())) { + /*if (isBlockedSession(sessEvent.getSessionInfo())) { return; - } + }*/ if (event.getType() == SessionStatusEvent.TYPE) { SessionStatusEvent statusEvent = (SessionStatusEvent) sessEvent; @@ -80,17 +251,17 @@ return; } else if (mayReconnect(flowContext)) { if (logger.isDebugEnabled()) { - debug(flowContext, "Reconnecting."); + debug(logger, flowContext, "Reconnecting."); } - connect(flowContext, true); + flowProcessor.connect(flowContext, true); } } else if (connectPartialSession) { flowContext = registerAndConnect(sessEvent.getSessionInfo(), true); } if (flowContext.state == STATE_CONNECTED) { - send(flowContext, (SessionPayloadEvent) event, true); + flowProcessor.send(flowContext, (SessionPayloadEvent) event, true); } } } else if (event.getType() == DataEvents.DataLoopEnd.TYPE) { @@ -98,7 +269,7 @@ logger.debug("DataLoopEnd received."); } - disconnectAllConnections(); + disconnectAllConnections(true); filterChain.reset(); } else if (event.getType() == DataEvents.DataEnd.TYPE) { if (logger.isDebugEnabled()) { @@ -110,12 +281,6 @@ } @Override - public void disconnect() { - super.disconnect(); - eventsQueue.clear(); - } - - @Override public void run() { working = true; while (working) { @@ -135,5 +300,32 @@ } } + class SynchWrapper extends AbstractFlowProcessorSupervisorWrapper { + public SynchWrapper(FlowWorker flowWorker) { + super(flowWorker); + } + + @Override + public FlowContext flowContext(SessionInfo session) { + FlowContext context = sessions.get(session); + if (context == null) { + if (logger.isDebugEnabled()) { + logger.debug("Context for session '" + session + "' not found."); + } + } + + return context; + } + + @Override + public void onDisconnected(FlowContext flowContext) { + sessions.remove(flowContext.session); + } + + @Override + public void onResponseReceived(FlowContext flowContext, Object response) { + fireResponseReceived(flowContext.sentEvent.getRequest(), response, flowContext); + } + } }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientListener.java Fri Apr 03 15:08:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientListener.java Mon Apr 06 14:13:39 2020 +0200 @@ -12,7 +12,7 @@ default void responseReceived(Object request, Object response, FlowContext context) { if (request instanceof HttpRequest - && response instanceof HttpResponse) { + || response instanceof HttpResponse) { responseReceived((HttpRequest) request, (HttpResponse) response, context); } }
--- a/stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java Fri Apr 03 15:08:47 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java Mon Apr 06 14:13:39 2020 +0200 @@ -1,21 +1,12 @@ package com.passus.st.client; -import com.passus.commons.service.ServiceUtils; import com.passus.st.AbstractWireMockTest; import com.passus.st.emitter.RuleBasedSessionMapper; import com.passus.st.emitter.nio.NioEmitter; -import com.passus.st.utils.EventUtils; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - import static com.github.tomakehurst.wiremock.client.WireMock.*; -import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertTrue; public class ParallelFlowWorkerTest extends AbstractWireMockTest { @@ -38,9 +29,9 @@ .withBody(content))); } - @Test + @Test(enabled = false) public void testHandle() throws Exception { - Map<String, Object> props = new HashMap<>(); + /* Map<String, Object> props = new HashMap<>(); props.put("allowPartialSession", true); props.put("ports", 4214); List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); @@ -67,7 +58,7 @@ assertTrue(responseStr.endsWith("test")); } finally { ServiceUtils.stopQuietly(emitter); - } + }*/ }
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Fri Apr 03 15:08:47 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Mon Apr 06 14:13:39 2020 +0200 @@ -9,6 +9,8 @@ import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpResponse; import com.passus.net.http.HttpResponseEncoder; +import com.passus.st.Log4jConfigurationFactory; +import com.passus.st.client.SynchFlowWorker.SynchWrapper; import com.passus.st.emitter.*; import com.passus.st.metric.MetricsContainer; import com.passus.st.utils.EventUtils; @@ -26,7 +28,7 @@ import static org.testng.AssertJUnit.assertFalse; public class SynchFlowWorkerTest { - + public static final long JOIN_TIMEOUT = Long.MAX_VALUE; private final TestHttpClientListener listener = new TestHttpClientListener(); @@ -72,7 +74,10 @@ protected void flush(LocalChannelContext channelContext) { SessionInfo sessionInfo = channelContext.getSessionInfo(); - SynchFlowWorker clientWorker = (SynchFlowWorker) channelContext.handler; + FlowProcessor flowProcessor = (FlowProcessor) channelContext.handler; + SynchWrapper wrapper = (SynchWrapper) flowProcessor.getSupervisor(); + SynchFlowWorker clientWorker = (SynchFlowWorker) wrapper.getFlowWorker(); + FlowContext flowContext = clientWorker.flowContext(sessionInfo); SessionPayloadEvent event = flowContext.sentEvent(); @@ -83,7 +88,7 @@ executor.execute(() -> { try { - clientWorker.dataReceived(channelContext, buff); + flowProcessor.dataReceived(channelContext, buff); } catch (Exception ex) { ex.printStackTrace(); }