changeset 1106:057627ea54d5

Timeouts unification
author Devel 2
date Tue, 19 May 2020 13:25:11 +0200
parents 48296cc7088e
children b626a5120c4f
files stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/client/Timeouts.java stress-tester/src/main/java/com/passus/st/config/CommonNodeDefs.java stress-tester/src/main/java/com/passus/st/config/TimeoutsNodeDefCreator.java stress-tester/src/main/java/com/passus/st/config/TimeoutsRuleNodeTransformer.java stress-tester/src/main/java/com/passus/st/emitter/Emitter.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker2.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java stress-tester/src/test/java/com/passus/st/client/AbstractFlowWorkerTest.java
diffstat 18 files changed, 259 insertions(+), 71 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Tue May 19 11:43:47 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Tue May 19 13:25:11 2020 +0200
@@ -71,13 +71,17 @@
     }
 
     protected void connect(FlowContext flowContext, boolean wait) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Connecting " + flowContext.sessionInfo() + ".");
+        }
+
         flowContext.lock();
         try {
             flowContext.connectionAttempts++;
             flowContext.state = STATE_CONNECTING;
             emitter.connect(flowContext.session, this, workerIndex);
             if (wait) {
-                waitOpFinished(flowContext, STATE_CONNECTED);
+                waitOpFinished(flowContext, STATE_CONNECTED, Long.MAX_VALUE);
             }
         } catch (Exception ex) {
             error(flowContext, ex);
@@ -108,7 +112,7 @@
             }
 
             flowContext.state = STATE_DISCONNECTING;
-            flowContext.timeout = now + timeouts.getDisconnectingTimeout();
+            flowContext.timeout = now + timeouts.getDisconnectTimeout();
 
             try {
                 supervisor.onDisconnecting(flowContext);
@@ -129,7 +133,7 @@
             }
 
             if (wait) {
-                waitOpFinished(flowContext, STATE_DISCONNECTED, timeouts.getDisconnectingTimeout());
+                waitOpFinished(flowContext, STATE_DISCONNECTED, timeouts.getDisconnectTimeout());
             }
 
             flowContext.client().destroy(flowContext);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Tue May 19 11:43:47 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Tue May 19 13:25:11 2020 +0200
@@ -283,7 +283,7 @@
             }
 
             flowContext.state = STATE_DISCONNECTING;
-            flowContext.timeout = now + timeouts.getDisconnectingTimeout();
+            flowContext.timeout = now + timeouts.getDisconnectTimeout();
 
             try {
                 onDisconnecting(flowContext);
@@ -304,7 +304,7 @@
             }
 
             if (wait) {
-                waitOpFinished(flowContext, STATE_DISCONNECTED, timeouts.getDisconnectingTimeout());
+                waitOpFinished(flowContext, STATE_DISCONNECTED, timeouts.getDisconnectTimeout());
             }
         } catch (InterruptedException e) {
             error(flowContext, e);
--- a/stress-tester/src/main/java/com/passus/st/client/Timeouts.java	Tue May 19 11:43:47 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/Timeouts.java	Tue May 19 13:25:11 2020 +0200
@@ -3,20 +3,22 @@
 public class Timeouts {
 
     public static final long DEFAULT_TIMEOUT = 10_000L;
-    public static final long DEFAULT_CONNECTING_TIMEOUT = 10_000L;
-    public static final long DEFAULT_DISCONNECTING = 10_000L;
+    public static final long DEFAULT_CONNECTION_TIMEOUT = 5_000L;
+    public static final long DEFAULT_READ_TIMEOUT = 20_000L;
+    public static final long DEFAULT_DISCONNECT_TIMEOUT = 5_000L;
 
     private long defaultTimeout = DEFAULT_TIMEOUT;
-    private long connectingTimeout = DEFAULT_CONNECTING_TIMEOUT;
-    private long disconnectingTimeout = DEFAULT_DISCONNECTING;
+    private long connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
+    private long readTimeout = DEFAULT_READ_TIMEOUT;
+    private long disconnectTimeout = DEFAULT_DISCONNECT_TIMEOUT;
 
     public Timeouts() {
     }
 
     public Timeouts(Timeouts timeouts) {
         this.defaultTimeout = defaultTimeout;
-        this.connectingTimeout = connectingTimeout;
-        this.disconnectingTimeout = disconnectingTimeout;
+        this.connectionTimeout = connectionTimeout;
+        this.disconnectTimeout = disconnectTimeout;
     }
 
     public long getDefaultTimeout() {
@@ -27,19 +29,27 @@
         this.defaultTimeout = defaultTimeout;
     }
 
-    public long getConnectingTimeout() {
-        return connectingTimeout;
-    }
-
-    public void setConnectingTimeout(long connectingTimeout) {
-        this.connectingTimeout = connectingTimeout;
+    public long getConnectionTimeout() {
+        return connectionTimeout;
     }
 
-    public long getDisconnectingTimeout() {
-        return disconnectingTimeout;
+    public void setConnectionTimeout(long connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
     }
 
-    public void setDisconnectingTimeout(long disconnectingTimeout) {
-        this.disconnectingTimeout = disconnectingTimeout;
+    public long getReadTimeout() {
+        return readTimeout;
+    }
+
+    public void setReadTimeout(long readTimeout) {
+        this.readTimeout = readTimeout;
+    }
+
+    public long getDisconnectTimeout() {
+        return disconnectTimeout;
+    }
+
+    public void setDisconnectTimeout(long disconnectTimeout) {
+        this.disconnectTimeout = disconnectTimeout;
     }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/config/CommonNodeDefs.java	Tue May 19 13:25:11 2020 +0200
@@ -0,0 +1,21 @@
+package com.passus.st.config;
+
+import com.passus.config.schema.NodeDefinition;
+import com.passus.config.validation.LongValidator;
+
+import static com.passus.config.schema.ConfigurationSchemaBuilder.valueDefInteger;
+import static com.passus.config.schema.ConfigurationSchemaBuilder.valueDefLong;
+
+public class CommonNodeDefs {
+
+    public static NodeDefinition INT_GREATER_THAN_ZERO = valueDefInteger().addValidator(LongValidator.GREATER_ZERO);
+
+    public static NodeDefinition LONG_GREATER_THAN_ZERO = valueDefLong().addValidator(LongValidator.GREATER_ZERO);
+
+    public static NodeDefinition INT_GREATER_EQUAL_ZERO = valueDefInteger().addValidator(LongValidator.GREATER_EQUAL_ZERO);
+
+    public static NodeDefinition LONG_GREATER_EQUAL_ZERO = valueDefLong().addValidator(LongValidator.GREATER_EQUAL_ZERO);
+
+    private CommonNodeDefs() {
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/config/TimeoutsNodeDefCreator.java	Tue May 19 13:25:11 2020 +0200
@@ -0,0 +1,32 @@
+package com.passus.st.config;
+
+import com.passus.config.schema.NodeDefinition;
+import com.passus.config.schema.NodeDefinitionCreator;
+import com.passus.st.client.Timeouts;
+
+import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef;
+import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef;
+import static com.passus.st.config.CommonNodeDefs.LONG_GREATER_THAN_ZERO;
+
+public class TimeoutsNodeDefCreator implements NodeDefinitionCreator {
+
+    private final Timeouts defaultTimeouts;
+
+    public TimeoutsNodeDefCreator() {
+        this(null);
+    }
+
+    public TimeoutsNodeDefCreator(Timeouts defaultTimeouts) {
+        this.defaultTimeouts = defaultTimeouts;
+    }
+
+    @Override
+    public NodeDefinition create() {
+        return mapDef(
+                tupleDef("connect", LONG_GREATER_THAN_ZERO).setRequired(false),
+                tupleDef("read", LONG_GREATER_THAN_ZERO).setRequired(false),
+                tupleDef("disconnect", LONG_GREATER_THAN_ZERO).setRequired(false)
+        ).setTransformer(new TimeoutsRuleNodeTransformer(defaultTimeouts));
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/config/TimeoutsRuleNodeTransformer.java	Tue May 19 13:25:11 2020 +0200
@@ -0,0 +1,61 @@
+package com.passus.st.config;
+
+import com.passus.config.*;
+import com.passus.config.schema.NodeTransformer;
+import com.passus.config.validation.Errors;
+import com.passus.st.client.Timeouts;
+
+import java.util.List;
+
+import static com.passus.config.ConfigurationUtils.extractLong;
+
+public class TimeoutsRuleNodeTransformer implements NodeTransformer<CNode> {
+
+    private final Timeouts defaultTimeouts;
+
+    public TimeoutsRuleNodeTransformer() {
+        this(null);
+    }
+
+    public TimeoutsRuleNodeTransformer(Timeouts defaultTimeouts) {
+        this.defaultTimeouts = defaultTimeouts;
+    }
+
+    @Override
+    public CNode transform(CNode node, Errors errors, ConfigurationContext context) {
+        CMapNode mapNode = (CMapNode) node;
+
+        Timeouts timeouts = defaultTimeouts == null ? new Timeouts() : new Timeouts(defaultTimeouts);
+        List<CTupleNode> tuples = mapNode.getChildren();
+        if (!tuples.isEmpty()) {
+            for (CTupleNode tuple : tuples) {
+                String opName = tuple.getName();
+                try {
+                    switch (opName.toLowerCase()) {
+                        case "connect":
+                            timeouts.setConnectionTimeout(extractLong(tuple));
+                            break;
+                        case "read":
+                            timeouts.setReadTimeout(extractLong(tuple));
+                            break;
+                        case "disconnect":
+                            timeouts.setDisconnectTimeout(extractLong(tuple));
+                            break;
+                        default:
+                            throw new IllegalArgumentException("Unknown parameter '" + opName + "'.");
+                    }
+                } catch (Exception ex) {
+                    throw new IllegalArgumentException(ex.getMessage(), ex);
+                }
+            }
+        }
+
+        return new CValueNode(timeouts);
+    }
+
+    @Override
+    public CNode reverseTransform(CNode node, Errors errors, ConfigurationContext context) {
+        throw new UnsupportedOperationException("Impossible."); // Predicate
+    }
+
+}
\ No newline at end of file
--- a/stress-tester/src/main/java/com/passus/st/emitter/Emitter.java	Tue May 19 11:43:47 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/Emitter.java	Tue May 19 13:25:11 2020 +0200
@@ -5,7 +5,10 @@
 import com.passus.config.Configuration;
 import com.passus.config.ConfigurationContext;
 import com.passus.config.schema.MapNodeDefinition;
+import com.passus.config.schema.NodeDefinition;
 import com.passus.config.schema.NodeDefinitionCreator;
+import com.passus.st.client.Timeouts;
+import com.passus.st.config.TimeoutsNodeDefCreator;
 import com.passus.st.metric.MetricSource;
 
 import java.io.IOException;
@@ -19,25 +22,35 @@
 
     SessionMapper DEFAULT_SESSION_MAPPER = new PassThroughSessionMapper();
 
+    Timeouts DEFAULT_TIMEOUTS = new Timeouts();
+
     void setSessionMapper(SessionMapper mapper);
 
     SessionMapper getSessionMapper();
 
+    void setTimeouts(Timeouts timeouts);
+
+    Timeouts getTimeouts();
+
     void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException;
 
     @Override
     default void configure(Configuration config, ConfigurationContext context) {
         setSessionMapper((SessionMapper) config.get("sessionMapper"));
         setCollectMetrics(config.get("collectMetrics", DEFAULT_COLLECT_METRICS));
+        setTimeouts(config.get("timeouts", DEFAULT_TIMEOUTS));
     }
 
     abstract class EmitterNodeDefCreator implements NodeDefinitionCreator {
 
         @Override
         public MapNodeDefinition create() {
+            NodeDefinition timeoutsNodeDef = new TimeoutsNodeDefCreator(DEFAULT_TIMEOUTS).create();
+
             return mapDef(
                     tupleDef("sessionMapper", SessionMapperNodeDefinitionCreator.createDef()).setRequired(false),
-                    tupleDef("collectMetrics", valueDefBool()).setRequired(false)
+                    tupleDef("collectMetrics", valueDefBool()).setRequired(false),
+                    tupleDef("timeouts", timeoutsNodeDef).setRequired(false)
             );
         }
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Tue May 19 11:43:47 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Tue May 19 13:25:11 2020 +0200
@@ -110,7 +110,7 @@
             boolean timeouted = false;
             while (!channelContext.finishConnect()) {
                 long now = System.currentTimeMillis();
-                if (now - connStart < connectionTimeout) {
+                if (now - connStart < timeouts.getConnectionTimeout()) {
                     timeouted = true;
                     break;
                 }
@@ -122,7 +122,7 @@
                     logger.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress());
                 }
 
-                throw new IOException("Connection timed out (" + connectionTimeout + "ms).");
+                throw new IOException("Connection timed out (" + timeouts.getConnectionTimeout() + "ms).");
             }
         } catch (Exception e) {
             doCatchException(key, e);
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java	Tue May 19 11:43:47 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java	Tue May 19 13:25:11 2020 +0200
@@ -1,6 +1,7 @@
 package com.passus.st.emitter.nio;
 
 import com.passus.config.ConfigurationContext;
+import com.passus.st.client.Timeouts;
 import com.passus.st.emitter.SessionMapper;
 import com.passus.commons.Assert;
 import com.passus.commons.annotations.Plugin;
@@ -51,9 +52,9 @@
 
     private boolean collectMetrics = DEFAULT_COLLECT_METRICS;
 
-    private long connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
+    private final Class<? extends NioEmitterWorker> workerClass;
 
-    private final Class<? extends NioEmitterWorker> workerClass;
+    private Timeouts timeouts = DEFAULT_TIMEOUTS;
 
     public NioEmitter() {
         this(NioEmitterWorkerImpl.class);
@@ -64,6 +65,27 @@
         this.workerClass = workerClass;
     }
 
+    @Override
+    public SessionMapper getSessionMapper() {
+        return sessionMapper;
+    }
+
+    @Override
+    public void setSessionMapper(SessionMapper sessionMapper) {
+        this.sessionMapper = sessionMapper;
+    }
+
+    @Override
+    public Timeouts getTimeouts() {
+        return timeouts;
+    }
+
+    @Override
+    public void setTimeouts(Timeouts timeouts) {
+        Assert.notNull(timeouts, "timeouts");
+        this.timeouts = timeouts;
+    }
+
     public int getMaxThreads() {
         return maxThreads;
     }
@@ -73,20 +95,6 @@
         this.maxThreads = maxThreads;
     }
 
-    public long getConnectionTimeout() {
-        return connectionTimeout;
-    }
-
-    public void setConnectionTimeout(long connectionTimeout) {
-        Assert.greaterThanZero(connectionTimeout, "connectionTimeout");
-        this.connectionTimeout = connectionTimeout;
-        if (workers != null) {
-            for (NioEmitterWorker worker : workers) {
-                worker.setConnectionTimeout(connectionTimeout);
-            }
-        }
-    }
-
     @Override
     public boolean isCollectMetrics() {
         return collectMetrics;
@@ -112,16 +120,6 @@
     }
 
     @Override
-    public SessionMapper getSessionMapper() {
-        return sessionMapper;
-    }
-
-    @Override
-    public void setSessionMapper(SessionMapper sessionMapper) {
-        this.sessionMapper = sessionMapper;
-    }
-
-    @Override
     public void writeMetrics(MetricsContainer container) {
         if (collectMetrics) {
             for (NioEmitterWorker worker : workers) {
@@ -134,7 +132,6 @@
     public void configure(Configuration config, ConfigurationContext context) {
         Emitter.super.configure(config, context);
         setMaxThreads(config.getInteger("threads", DEFAULT_NUM_THREADS));
-        setConnectionTimeout(config.getLong("connectionTimeout", DEFAULT_CONNECTION_TIMEOUT));
     }
 
     @Override
@@ -158,7 +155,7 @@
                 NioEmitterWorker worker = workerClass.getConstructor(Integer.TYPE).newInstance(i);
                 worker.setSessionMapper(sessionMapper);
                 worker.setCollectMetrics(collectMetrics);
-                worker.setConnectionTimeout(connectionTimeout);
+                worker.setTimeouts(timeouts);
                 workers[i] = worker;
                 worker.start();
             } catch (Exception e) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Tue May 19 11:43:47 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Tue May 19 13:25:11 2020 +0200
@@ -1,6 +1,7 @@
 package com.passus.st.emitter.nio;
 
 import com.passus.commons.Assert;
+import com.passus.st.client.Timeouts;
 import com.passus.st.emitter.EmitterHandler;
 import com.passus.st.emitter.EmitterMetric;
 import com.passus.st.emitter.SessionInfo;
@@ -11,6 +12,8 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import static com.passus.st.emitter.Emitter.DEFAULT_TIMEOUTS;
+
 /**
  *
  * @author Mirosław Hawrot
@@ -27,9 +30,9 @@
 
     protected volatile boolean collectMetrics = false;
 
-    protected long connectionTimeout = 5_000;
+    protected EmitterMetric metric;
 
-    protected EmitterMetric metric;
+    protected Timeouts timeouts = DEFAULT_TIMEOUTS;
 
     protected NioEmitterWorker(int index) throws IOException {
         super("NioEmitterWorker-" + index);
@@ -57,12 +60,13 @@
         this.selectTimeout = selectTimeout;
     }
 
-    public long getConnectionTimeout() {
-        return connectionTimeout;
+    public Timeouts getTimeouts() {
+        return timeouts;
     }
 
-    public void setConnectionTimeout(long connectionTimeout) {
-        this.connectionTimeout = connectionTimeout;
+    public void setTimeouts(Timeouts timeouts) {
+        Assert.notNull(timeouts, "timeouts");
+        this.timeouts = timeouts;
     }
 
     public SessionMapper getSessionMapper() {
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker2.java	Tue May 19 11:43:47 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker2.java	Tue May 19 13:25:11 2020 +0200
@@ -246,7 +246,7 @@
             boolean timeouted = false;
             while (!channel.finishConnect()) {
                 long now = System.currentTimeMillis();
-                if (now - connStart < connectionTimeout) {
+                if (now - connStart < timeouts.getConnectionTimeout()) {
                     timeouted = true;
                     break;
                 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java	Tue May 19 11:43:47 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java	Tue May 19 13:25:11 2020 +0200
@@ -42,6 +42,7 @@
 
             SocketChannel channel = SocketChannel.open();
             channel.configureBlocking(false);
+            channel.socket().setSoTimeout((int) timeouts.getReadTimeout());
 
             SocketAddress bindAddress = connParams.getBindAddress();
             if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java	Tue May 19 11:43:47 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java	Tue May 19 13:25:11 2020 +0200
@@ -6,6 +6,7 @@
 import com.passus.config.schema.MapNodeDefinition;
 import com.passus.config.validation.LongValidator;
 import com.passus.net.session.Session;
+import com.passus.st.client.Timeouts;
 import com.passus.st.emitter.*;
 import com.passus.st.metric.MetricsContainer;
 import org.apache.logging.log4j.LogManager;
@@ -47,6 +48,8 @@
 
     protected final boolean trace;
 
+    protected Timeouts timeouts = DEFAULT_TIMEOUTS;
+
     public UnidirectionalRawPacketEmitter() {
         this.trace = logger.isTraceEnabled();
     }
@@ -61,6 +64,17 @@
         return sessionMapper;
     }
 
+    @Override
+    public Timeouts getTimeouts() {
+        return timeouts;
+    }
+
+    @Override
+    public void setTimeouts(Timeouts timeouts) {
+        Assert.notNull(timeouts, "timeouts");
+        this.timeouts = timeouts;
+    }
+
     public int getThreads() {
         return threads;
     }
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Tue May 19 11:43:47 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Tue May 19 13:25:11 2020 +0200
@@ -3,6 +3,7 @@
 import com.passus.data.ByteBuff;
 import com.passus.data.HeapByteBuff;
 import com.passus.net.SocketAddress;
+import com.passus.st.client.Timeouts;
 import com.passus.st.emitter.*;
 import com.passus.st.emitter.SessionMapper.ConnectionParams;
 import org.apache.logging.log4j.LogManager;
@@ -26,6 +27,8 @@
 
     final SessionMapper sessionMapper;
 
+    final Timeouts timeouts;
+
     final ConnectionListener listener;
 
     SocketAddress localAddress;
@@ -40,10 +43,11 @@
 
     ByteBuff buffer = new HeapByteBuff();
 
-    public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, boolean collectMetrics, ConnectionListener listener) {
+    public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, Timeouts timeouts, boolean collectMetrics, ConnectionListener listener) {
         this.sessionInfo = sessionInfo;
         this.handler = handler;
         this.sessionMapper = sessionMapper;
+        this.timeouts = timeouts;
         this.listener = listener;
         this.collectMetrics = collectMetrics;
         if (collectMetrics) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Tue May 19 11:43:47 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Tue May 19 13:25:11 2020 +0200
@@ -2,6 +2,7 @@
 
 import com.passus.data.ByteBuff;
 import com.passus.net.SocketAddress;
+import com.passus.st.client.Timeouts;
 import com.passus.st.emitter.EmitterHandler;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.emitter.SessionMapper;
@@ -35,9 +36,9 @@
     private DatagramPacket readPacket = new DatagramPacket(buffer, buffer.length);
 
     public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler,
-                              SessionMapper sessionMapper, boolean collectMetrics,
-                              ConnectionListener listener) {
-        super(sessionInfo, handler, sessionMapper, collectMetrics, listener);
+                              SessionMapper sessionMapper, Timeouts timeouts,
+                              boolean collectMetrics, ConnectionListener listener) {
+        super(sessionInfo, handler, sessionMapper, timeouts, collectMetrics, listener);
     }
 
     public boolean isConnected() {
@@ -88,6 +89,7 @@
         }
 
         try {
+            socket.setSoTimeout((int) timeouts.getReadTimeout());
             socket.connect(remoteSocket);
             handler.channelActive(channelContext);
         } catch (Exception ex) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java	Tue May 19 11:43:47 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java	Tue May 19 13:25:11 2020 +0200
@@ -2,6 +2,7 @@
 
 import com.passus.data.ByteBuff;
 import com.passus.net.SocketAddress;
+import com.passus.st.client.Timeouts;
 import com.passus.st.emitter.EmitterHandler;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.emitter.SessionMapper;
@@ -43,9 +44,9 @@
     private byte[] readBuffer = new byte[bufferSize];
 
     public SocketConnection(SessionInfo sessionInfo, EmitterHandler handler,
-                            SessionMapper sessionMapper, boolean collectMetrics,
-                            ConnectionListener listener) {
-        super(sessionInfo, handler, sessionMapper, collectMetrics, listener);
+                            SessionMapper sessionMapper, Timeouts timeouts,
+                            boolean collectMetrics, ConnectionListener listener) {
+        super(sessionInfo, handler, sessionMapper, timeouts, collectMetrics, listener);
     }
 
     @Override
@@ -103,7 +104,8 @@
         }
 
         try {
-            socket.connect(remoteSocket);
+            socket.setSoTimeout((int) timeouts.getReadTimeout());
+            socket.connect(remoteSocket, (int) timeouts.getConnectionTimeout());
             in = socket.getInputStream();
             out = socket.getOutputStream();
             handler.channelActive(channelContext);
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Tue May 19 11:43:47 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Tue May 19 13:25:11 2020 +0200
@@ -1,5 +1,6 @@
 package com.passus.st.emitter.socket;
 
+import com.passus.commons.Assert;
 import com.passus.commons.annotations.Plugin;
 import com.passus.config.Configuration;
 import com.passus.config.ConfigurationContext;
@@ -7,6 +8,7 @@
 import com.passus.config.schema.MapNodeDefinition;
 import com.passus.net.session.Session;
 import com.passus.st.client.PerNameMetricsContainer;
+import com.passus.st.client.Timeouts;
 import com.passus.st.emitter.*;
 import com.passus.st.metric.MetricsContainer;
 import com.passus.st.plugin.PluginConstants;
@@ -37,12 +39,12 @@
 
     private int maxThreads = DEFAULT_NUM_THREADS;
 
-    private long connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
-
     private final PerNameMetricsContainer deferredMetrics = new PerNameMetricsContainer();
 
     private boolean started;
 
+    private Timeouts timeouts = DEFAULT_TIMEOUTS;
+
     @Override
     public void setSessionMapper(SessionMapper sessionMapper) {
         this.sessionMapper = sessionMapper;
@@ -54,6 +56,17 @@
     }
 
     @Override
+    public Timeouts getTimeouts() {
+        return timeouts;
+    }
+
+    @Override
+    public void setTimeouts(Timeouts timeouts) {
+        Assert.notNull(timeouts);
+        this.timeouts = timeouts;
+    }
+
+    @Override
     public boolean isCollectMetrics() {
         return collectMetrics;
     }
@@ -168,9 +181,9 @@
 
             Connection connection;
             if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) {
-                connection = new SocketConnection(sessionInfo, handler, sessionMapper, collectMetrics, listener);
+                connection = new SocketConnection(sessionInfo, handler, sessionMapper, timeouts, collectMetrics, listener);
             } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) {
-                connection = new DatagramConnection(sessionInfo, handler, sessionMapper, collectMetrics, listener);
+                connection = new DatagramConnection(sessionInfo, handler, sessionMapper, timeouts, collectMetrics, listener);
             } else {
                 throw new IllegalArgumentException("Not supported transport " + sessionInfo.getTransport() + ".");
             }
--- a/stress-tester/src/test/java/com/passus/st/client/AbstractFlowWorkerTest.java	Tue May 19 11:43:47 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/AbstractFlowWorkerTest.java	Tue May 19 13:25:11 2020 +0200
@@ -67,6 +67,16 @@
         }
 
         @Override
+        public void setTimeouts(Timeouts timeouts) {
+
+        }
+
+        @Override
+        public Timeouts getTimeouts() {
+            return null;
+        }
+
+        @Override
         public void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException {
             LocalChannelContext channelContext = new LocalChannelContext(this, handler, null, session);
             try {