Mercurial > stress-tester
changeset 1106:057627ea54d5
Timeouts unification
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Tue May 19 11:43:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Tue May 19 13:25:11 2020 +0200 @@ -71,13 +71,17 @@ } protected void connect(FlowContext flowContext, boolean wait) { + if (logger.isDebugEnabled()) { + logger.debug("Connecting " + flowContext.sessionInfo() + "."); + } + flowContext.lock(); try { flowContext.connectionAttempts++; flowContext.state = STATE_CONNECTING; emitter.connect(flowContext.session, this, workerIndex); if (wait) { - waitOpFinished(flowContext, STATE_CONNECTED); + waitOpFinished(flowContext, STATE_CONNECTED, Long.MAX_VALUE); } } catch (Exception ex) { error(flowContext, ex); @@ -108,7 +112,7 @@ } flowContext.state = STATE_DISCONNECTING; - flowContext.timeout = now + timeouts.getDisconnectingTimeout(); + flowContext.timeout = now + timeouts.getDisconnectTimeout(); try { supervisor.onDisconnecting(flowContext); @@ -129,7 +133,7 @@ } if (wait) { - waitOpFinished(flowContext, STATE_DISCONNECTED, timeouts.getDisconnectingTimeout()); + waitOpFinished(flowContext, STATE_DISCONNECTED, timeouts.getDisconnectTimeout()); } flowContext.client().destroy(flowContext);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Tue May 19 11:43:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Tue May 19 13:25:11 2020 +0200 @@ -283,7 +283,7 @@ } flowContext.state = STATE_DISCONNECTING; - flowContext.timeout = now + timeouts.getDisconnectingTimeout(); + flowContext.timeout = now + timeouts.getDisconnectTimeout(); try { onDisconnecting(flowContext); @@ -304,7 +304,7 @@ } if (wait) { - waitOpFinished(flowContext, STATE_DISCONNECTED, timeouts.getDisconnectingTimeout()); + waitOpFinished(flowContext, STATE_DISCONNECTED, timeouts.getDisconnectTimeout()); } } catch (InterruptedException e) { error(flowContext, e);
--- a/stress-tester/src/main/java/com/passus/st/client/Timeouts.java Tue May 19 11:43:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/Timeouts.java Tue May 19 13:25:11 2020 +0200 @@ -3,20 +3,22 @@ public class Timeouts { public static final long DEFAULT_TIMEOUT = 10_000L; - public static final long DEFAULT_CONNECTING_TIMEOUT = 10_000L; - public static final long DEFAULT_DISCONNECTING = 10_000L; + public static final long DEFAULT_CONNECTION_TIMEOUT = 5_000L; + public static final long DEFAULT_READ_TIMEOUT = 20_000L; + public static final long DEFAULT_DISCONNECT_TIMEOUT = 5_000L; private long defaultTimeout = DEFAULT_TIMEOUT; - private long connectingTimeout = DEFAULT_CONNECTING_TIMEOUT; - private long disconnectingTimeout = DEFAULT_DISCONNECTING; + private long connectionTimeout = DEFAULT_CONNECTION_TIMEOUT; + private long readTimeout = DEFAULT_READ_TIMEOUT; + private long disconnectTimeout = DEFAULT_DISCONNECT_TIMEOUT; public Timeouts() { } public Timeouts(Timeouts timeouts) { this.defaultTimeout = defaultTimeout; - this.connectingTimeout = connectingTimeout; - this.disconnectingTimeout = disconnectingTimeout; + this.connectionTimeout = connectionTimeout; + this.disconnectTimeout = disconnectTimeout; } public long getDefaultTimeout() { @@ -27,19 +29,27 @@ this.defaultTimeout = defaultTimeout; } - public long getConnectingTimeout() { - return connectingTimeout; - } - - public void setConnectingTimeout(long connectingTimeout) { - this.connectingTimeout = connectingTimeout; + public long getConnectionTimeout() { + return connectionTimeout; } - public long getDisconnectingTimeout() { - return disconnectingTimeout; + public void setConnectionTimeout(long connectionTimeout) { + this.connectionTimeout = connectionTimeout; } - public void setDisconnectingTimeout(long disconnectingTimeout) { - this.disconnectingTimeout = disconnectingTimeout; + public long getReadTimeout() { + return readTimeout; + } + + public void setReadTimeout(long readTimeout) { + this.readTimeout = readTimeout; + } + + public long getDisconnectTimeout() { + return disconnectTimeout; + } + + public void setDisconnectTimeout(long disconnectTimeout) { + this.disconnectTimeout = disconnectTimeout; } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/config/CommonNodeDefs.java Tue May 19 13:25:11 2020 +0200 @@ -0,0 +1,21 @@ +package com.passus.st.config; + +import com.passus.config.schema.NodeDefinition; +import com.passus.config.validation.LongValidator; + +import static com.passus.config.schema.ConfigurationSchemaBuilder.valueDefInteger; +import static com.passus.config.schema.ConfigurationSchemaBuilder.valueDefLong; + +public class CommonNodeDefs { + + public static NodeDefinition INT_GREATER_THAN_ZERO = valueDefInteger().addValidator(LongValidator.GREATER_ZERO); + + public static NodeDefinition LONG_GREATER_THAN_ZERO = valueDefLong().addValidator(LongValidator.GREATER_ZERO); + + public static NodeDefinition INT_GREATER_EQUAL_ZERO = valueDefInteger().addValidator(LongValidator.GREATER_EQUAL_ZERO); + + public static NodeDefinition LONG_GREATER_EQUAL_ZERO = valueDefLong().addValidator(LongValidator.GREATER_EQUAL_ZERO); + + private CommonNodeDefs() { + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/config/TimeoutsNodeDefCreator.java Tue May 19 13:25:11 2020 +0200 @@ -0,0 +1,32 @@ +package com.passus.st.config; + +import com.passus.config.schema.NodeDefinition; +import com.passus.config.schema.NodeDefinitionCreator; +import com.passus.st.client.Timeouts; + +import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef; +import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef; +import static com.passus.st.config.CommonNodeDefs.LONG_GREATER_THAN_ZERO; + +public class TimeoutsNodeDefCreator implements NodeDefinitionCreator { + + private final Timeouts defaultTimeouts; + + public TimeoutsNodeDefCreator() { + this(null); + } + + public TimeoutsNodeDefCreator(Timeouts defaultTimeouts) { + this.defaultTimeouts = defaultTimeouts; + } + + @Override + public NodeDefinition create() { + return mapDef( + tupleDef("connect", LONG_GREATER_THAN_ZERO).setRequired(false), + tupleDef("read", LONG_GREATER_THAN_ZERO).setRequired(false), + tupleDef("disconnect", LONG_GREATER_THAN_ZERO).setRequired(false) + ).setTransformer(new TimeoutsRuleNodeTransformer(defaultTimeouts)); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/config/TimeoutsRuleNodeTransformer.java Tue May 19 13:25:11 2020 +0200 @@ -0,0 +1,61 @@ +package com.passus.st.config; + +import com.passus.config.*; +import com.passus.config.schema.NodeTransformer; +import com.passus.config.validation.Errors; +import com.passus.st.client.Timeouts; + +import java.util.List; + +import static com.passus.config.ConfigurationUtils.extractLong; + +public class TimeoutsRuleNodeTransformer implements NodeTransformer<CNode> { + + private final Timeouts defaultTimeouts; + + public TimeoutsRuleNodeTransformer() { + this(null); + } + + public TimeoutsRuleNodeTransformer(Timeouts defaultTimeouts) { + this.defaultTimeouts = defaultTimeouts; + } + + @Override + public CNode transform(CNode node, Errors errors, ConfigurationContext context) { + CMapNode mapNode = (CMapNode) node; + + Timeouts timeouts = defaultTimeouts == null ? new Timeouts() : new Timeouts(defaultTimeouts); + List<CTupleNode> tuples = mapNode.getChildren(); + if (!tuples.isEmpty()) { + for (CTupleNode tuple : tuples) { + String opName = tuple.getName(); + try { + switch (opName.toLowerCase()) { + case "connect": + timeouts.setConnectionTimeout(extractLong(tuple)); + break; + case "read": + timeouts.setReadTimeout(extractLong(tuple)); + break; + case "disconnect": + timeouts.setDisconnectTimeout(extractLong(tuple)); + break; + default: + throw new IllegalArgumentException("Unknown parameter '" + opName + "'."); + } + } catch (Exception ex) { + throw new IllegalArgumentException(ex.getMessage(), ex); + } + } + } + + return new CValueNode(timeouts); + } + + @Override + public CNode reverseTransform(CNode node, Errors errors, ConfigurationContext context) { + throw new UnsupportedOperationException("Impossible."); // Predicate + } + +} \ No newline at end of file
--- a/stress-tester/src/main/java/com/passus/st/emitter/Emitter.java Tue May 19 11:43:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/Emitter.java Tue May 19 13:25:11 2020 +0200 @@ -5,7 +5,10 @@ import com.passus.config.Configuration; import com.passus.config.ConfigurationContext; import com.passus.config.schema.MapNodeDefinition; +import com.passus.config.schema.NodeDefinition; import com.passus.config.schema.NodeDefinitionCreator; +import com.passus.st.client.Timeouts; +import com.passus.st.config.TimeoutsNodeDefCreator; import com.passus.st.metric.MetricSource; import java.io.IOException; @@ -19,25 +22,35 @@ SessionMapper DEFAULT_SESSION_MAPPER = new PassThroughSessionMapper(); + Timeouts DEFAULT_TIMEOUTS = new Timeouts(); + void setSessionMapper(SessionMapper mapper); SessionMapper getSessionMapper(); + void setTimeouts(Timeouts timeouts); + + Timeouts getTimeouts(); + void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException; @Override default void configure(Configuration config, ConfigurationContext context) { setSessionMapper((SessionMapper) config.get("sessionMapper")); setCollectMetrics(config.get("collectMetrics", DEFAULT_COLLECT_METRICS)); + setTimeouts(config.get("timeouts", DEFAULT_TIMEOUTS)); } abstract class EmitterNodeDefCreator implements NodeDefinitionCreator { @Override public MapNodeDefinition create() { + NodeDefinition timeoutsNodeDef = new TimeoutsNodeDefCreator(DEFAULT_TIMEOUTS).create(); + return mapDef( tupleDef("sessionMapper", SessionMapperNodeDefinitionCreator.createDef()).setRequired(false), - tupleDef("collectMetrics", valueDefBool()).setRequired(false) + tupleDef("collectMetrics", valueDefBool()).setRequired(false), + tupleDef("timeouts", timeoutsNodeDef).setRequired(false) ); }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java Tue May 19 11:43:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java Tue May 19 13:25:11 2020 +0200 @@ -110,7 +110,7 @@ boolean timeouted = false; while (!channelContext.finishConnect()) { long now = System.currentTimeMillis(); - if (now - connStart < connectionTimeout) { + if (now - connStart < timeouts.getConnectionTimeout()) { timeouted = true; break; } @@ -122,7 +122,7 @@ logger.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress()); } - throw new IOException("Connection timed out (" + connectionTimeout + "ms)."); + throw new IOException("Connection timed out (" + timeouts.getConnectionTimeout() + "ms)."); } } catch (Exception e) { doCatchException(key, e);
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java Tue May 19 11:43:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java Tue May 19 13:25:11 2020 +0200 @@ -1,6 +1,7 @@ package com.passus.st.emitter.nio; import com.passus.config.ConfigurationContext; +import com.passus.st.client.Timeouts; import com.passus.st.emitter.SessionMapper; import com.passus.commons.Assert; import com.passus.commons.annotations.Plugin; @@ -51,9 +52,9 @@ private boolean collectMetrics = DEFAULT_COLLECT_METRICS; - private long connectionTimeout = DEFAULT_CONNECTION_TIMEOUT; + private final Class<? extends NioEmitterWorker> workerClass; - private final Class<? extends NioEmitterWorker> workerClass; + private Timeouts timeouts = DEFAULT_TIMEOUTS; public NioEmitter() { this(NioEmitterWorkerImpl.class); @@ -64,6 +65,27 @@ this.workerClass = workerClass; } + @Override + public SessionMapper getSessionMapper() { + return sessionMapper; + } + + @Override + public void setSessionMapper(SessionMapper sessionMapper) { + this.sessionMapper = sessionMapper; + } + + @Override + public Timeouts getTimeouts() { + return timeouts; + } + + @Override + public void setTimeouts(Timeouts timeouts) { + Assert.notNull(timeouts, "timeouts"); + this.timeouts = timeouts; + } + public int getMaxThreads() { return maxThreads; } @@ -73,20 +95,6 @@ this.maxThreads = maxThreads; } - public long getConnectionTimeout() { - return connectionTimeout; - } - - public void setConnectionTimeout(long connectionTimeout) { - Assert.greaterThanZero(connectionTimeout, "connectionTimeout"); - this.connectionTimeout = connectionTimeout; - if (workers != null) { - for (NioEmitterWorker worker : workers) { - worker.setConnectionTimeout(connectionTimeout); - } - } - } - @Override public boolean isCollectMetrics() { return collectMetrics; @@ -112,16 +120,6 @@ } @Override - public SessionMapper getSessionMapper() { - return sessionMapper; - } - - @Override - public void setSessionMapper(SessionMapper sessionMapper) { - this.sessionMapper = sessionMapper; - } - - @Override public void writeMetrics(MetricsContainer container) { if (collectMetrics) { for (NioEmitterWorker worker : workers) { @@ -134,7 +132,6 @@ public void configure(Configuration config, ConfigurationContext context) { Emitter.super.configure(config, context); setMaxThreads(config.getInteger("threads", DEFAULT_NUM_THREADS)); - setConnectionTimeout(config.getLong("connectionTimeout", DEFAULT_CONNECTION_TIMEOUT)); } @Override @@ -158,7 +155,7 @@ NioEmitterWorker worker = workerClass.getConstructor(Integer.TYPE).newInstance(i); worker.setSessionMapper(sessionMapper); worker.setCollectMetrics(collectMetrics); - worker.setConnectionTimeout(connectionTimeout); + worker.setTimeouts(timeouts); workers[i] = worker; worker.start(); } catch (Exception e) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java Tue May 19 11:43:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java Tue May 19 13:25:11 2020 +0200 @@ -1,6 +1,7 @@ package com.passus.st.emitter.nio; import com.passus.commons.Assert; +import com.passus.st.client.Timeouts; import com.passus.st.emitter.EmitterHandler; import com.passus.st.emitter.EmitterMetric; import com.passus.st.emitter.SessionInfo; @@ -11,6 +12,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import static com.passus.st.emitter.Emitter.DEFAULT_TIMEOUTS; + /** * * @author Mirosław Hawrot @@ -27,9 +30,9 @@ protected volatile boolean collectMetrics = false; - protected long connectionTimeout = 5_000; + protected EmitterMetric metric; - protected EmitterMetric metric; + protected Timeouts timeouts = DEFAULT_TIMEOUTS; protected NioEmitterWorker(int index) throws IOException { super("NioEmitterWorker-" + index); @@ -57,12 +60,13 @@ this.selectTimeout = selectTimeout; } - public long getConnectionTimeout() { - return connectionTimeout; + public Timeouts getTimeouts() { + return timeouts; } - public void setConnectionTimeout(long connectionTimeout) { - this.connectionTimeout = connectionTimeout; + public void setTimeouts(Timeouts timeouts) { + Assert.notNull(timeouts, "timeouts"); + this.timeouts = timeouts; } public SessionMapper getSessionMapper() {
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker2.java Tue May 19 11:43:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker2.java Tue May 19 13:25:11 2020 +0200 @@ -246,7 +246,7 @@ boolean timeouted = false; while (!channel.finishConnect()) { long now = System.currentTimeMillis(); - if (now - connStart < connectionTimeout) { + if (now - connStart < timeouts.getConnectionTimeout()) { timeouted = true; break; }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java Tue May 19 11:43:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java Tue May 19 13:25:11 2020 +0200 @@ -42,6 +42,7 @@ SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); + channel.socket().setSoTimeout((int) timeouts.getReadTimeout()); SocketAddress bindAddress = connParams.getBindAddress(); if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java Tue May 19 11:43:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java Tue May 19 13:25:11 2020 +0200 @@ -6,6 +6,7 @@ import com.passus.config.schema.MapNodeDefinition; import com.passus.config.validation.LongValidator; import com.passus.net.session.Session; +import com.passus.st.client.Timeouts; import com.passus.st.emitter.*; import com.passus.st.metric.MetricsContainer; import org.apache.logging.log4j.LogManager; @@ -47,6 +48,8 @@ protected final boolean trace; + protected Timeouts timeouts = DEFAULT_TIMEOUTS; + public UnidirectionalRawPacketEmitter() { this.trace = logger.isTraceEnabled(); } @@ -61,6 +64,17 @@ return sessionMapper; } + @Override + public Timeouts getTimeouts() { + return timeouts; + } + + @Override + public void setTimeouts(Timeouts timeouts) { + Assert.notNull(timeouts, "timeouts"); + this.timeouts = timeouts; + } + public int getThreads() { return threads; }
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Tue May 19 11:43:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Tue May 19 13:25:11 2020 +0200 @@ -3,6 +3,7 @@ import com.passus.data.ByteBuff; import com.passus.data.HeapByteBuff; import com.passus.net.SocketAddress; +import com.passus.st.client.Timeouts; import com.passus.st.emitter.*; import com.passus.st.emitter.SessionMapper.ConnectionParams; import org.apache.logging.log4j.LogManager; @@ -26,6 +27,8 @@ final SessionMapper sessionMapper; + final Timeouts timeouts; + final ConnectionListener listener; SocketAddress localAddress; @@ -40,10 +43,11 @@ ByteBuff buffer = new HeapByteBuff(); - public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, boolean collectMetrics, ConnectionListener listener) { + public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, Timeouts timeouts, boolean collectMetrics, ConnectionListener listener) { this.sessionInfo = sessionInfo; this.handler = handler; this.sessionMapper = sessionMapper; + this.timeouts = timeouts; this.listener = listener; this.collectMetrics = collectMetrics; if (collectMetrics) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java Tue May 19 11:43:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java Tue May 19 13:25:11 2020 +0200 @@ -2,6 +2,7 @@ import com.passus.data.ByteBuff; import com.passus.net.SocketAddress; +import com.passus.st.client.Timeouts; import com.passus.st.emitter.EmitterHandler; import com.passus.st.emitter.SessionInfo; import com.passus.st.emitter.SessionMapper; @@ -35,9 +36,9 @@ private DatagramPacket readPacket = new DatagramPacket(buffer, buffer.length); public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, - SessionMapper sessionMapper, boolean collectMetrics, - ConnectionListener listener) { - super(sessionInfo, handler, sessionMapper, collectMetrics, listener); + SessionMapper sessionMapper, Timeouts timeouts, + boolean collectMetrics, ConnectionListener listener) { + super(sessionInfo, handler, sessionMapper, timeouts, collectMetrics, listener); } public boolean isConnected() { @@ -88,6 +89,7 @@ } try { + socket.setSoTimeout((int) timeouts.getReadTimeout()); socket.connect(remoteSocket); handler.channelActive(channelContext); } catch (Exception ex) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java Tue May 19 11:43:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java Tue May 19 13:25:11 2020 +0200 @@ -2,6 +2,7 @@ import com.passus.data.ByteBuff; import com.passus.net.SocketAddress; +import com.passus.st.client.Timeouts; import com.passus.st.emitter.EmitterHandler; import com.passus.st.emitter.SessionInfo; import com.passus.st.emitter.SessionMapper; @@ -43,9 +44,9 @@ private byte[] readBuffer = new byte[bufferSize]; public SocketConnection(SessionInfo sessionInfo, EmitterHandler handler, - SessionMapper sessionMapper, boolean collectMetrics, - ConnectionListener listener) { - super(sessionInfo, handler, sessionMapper, collectMetrics, listener); + SessionMapper sessionMapper, Timeouts timeouts, + boolean collectMetrics, ConnectionListener listener) { + super(sessionInfo, handler, sessionMapper, timeouts, collectMetrics, listener); } @Override @@ -103,7 +104,8 @@ } try { - socket.connect(remoteSocket); + socket.setSoTimeout((int) timeouts.getReadTimeout()); + socket.connect(remoteSocket, (int) timeouts.getConnectionTimeout()); in = socket.getInputStream(); out = socket.getOutputStream(); handler.channelActive(channelContext);
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Tue May 19 11:43:47 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Tue May 19 13:25:11 2020 +0200 @@ -1,5 +1,6 @@ package com.passus.st.emitter.socket; +import com.passus.commons.Assert; import com.passus.commons.annotations.Plugin; import com.passus.config.Configuration; import com.passus.config.ConfigurationContext; @@ -7,6 +8,7 @@ import com.passus.config.schema.MapNodeDefinition; import com.passus.net.session.Session; import com.passus.st.client.PerNameMetricsContainer; +import com.passus.st.client.Timeouts; import com.passus.st.emitter.*; import com.passus.st.metric.MetricsContainer; import com.passus.st.plugin.PluginConstants; @@ -37,12 +39,12 @@ private int maxThreads = DEFAULT_NUM_THREADS; - private long connectionTimeout = DEFAULT_CONNECTION_TIMEOUT; - private final PerNameMetricsContainer deferredMetrics = new PerNameMetricsContainer(); private boolean started; + private Timeouts timeouts = DEFAULT_TIMEOUTS; + @Override public void setSessionMapper(SessionMapper sessionMapper) { this.sessionMapper = sessionMapper; @@ -54,6 +56,17 @@ } @Override + public Timeouts getTimeouts() { + return timeouts; + } + + @Override + public void setTimeouts(Timeouts timeouts) { + Assert.notNull(timeouts); + this.timeouts = timeouts; + } + + @Override public boolean isCollectMetrics() { return collectMetrics; } @@ -168,9 +181,9 @@ Connection connection; if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) { - connection = new SocketConnection(sessionInfo, handler, sessionMapper, collectMetrics, listener); + connection = new SocketConnection(sessionInfo, handler, sessionMapper, timeouts, collectMetrics, listener); } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) { - connection = new DatagramConnection(sessionInfo, handler, sessionMapper, collectMetrics, listener); + connection = new DatagramConnection(sessionInfo, handler, sessionMapper, timeouts, collectMetrics, listener); } else { throw new IllegalArgumentException("Not supported transport " + sessionInfo.getTransport() + "."); }
--- a/stress-tester/src/test/java/com/passus/st/client/AbstractFlowWorkerTest.java Tue May 19 11:43:47 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/AbstractFlowWorkerTest.java Tue May 19 13:25:11 2020 +0200 @@ -67,6 +67,16 @@ } @Override + public void setTimeouts(Timeouts timeouts) { + + } + + @Override + public Timeouts getTimeouts() { + return null; + } + + @Override public void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException { LocalChannelContext channelContext = new LocalChannelContext(this, handler, null, session); try {