changeset 1110:e488f699f8c2

SocketParameters
author Devel 2
date Wed, 20 May 2020 11:27:49 +0200
parents 736e2559c13c
children 7da8b2a6bb2e
files stress-tester/src/main/java/com/passus/st/config/CommonNodeDefs.java stress-tester/src/main/java/com/passus/st/config/SocketParametersNodeDefCreator.java stress-tester/src/main/java/com/passus/st/config/SocketParametersRuleNodeTransformer.java stress-tester/src/main/java/com/passus/st/emitter/SocketParameters.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java stress-tester/src/test/java/com/passus/st/config/TestJobConfiguratorTest.java stress-tester/src/test/java/com/passus/st/emitter/EmitterConfiguratorTest.java stress-tester/src/test/resources/com/passus/st/config/test_job_config.yml stress-tester/src/test/resources/com/passus/st/project/test_project_config.yml
diffstat 15 files changed, 331 insertions(+), 47 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/config/CommonNodeDefs.java	Wed May 20 09:45:26 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/config/CommonNodeDefs.java	Wed May 20 11:27:49 2020 +0200
@@ -2,6 +2,8 @@
 
 import com.passus.config.schema.NodeDefinition;
 import com.passus.config.validation.LongValidator;
+import com.passus.st.filter.MessagePredicateNodeDefinition;
+import com.passus.st.validation.PortValidator;
 
 import static com.passus.config.schema.ConfigurationSchemaBuilder.*;
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/config/SocketParametersNodeDefCreator.java	Wed May 20 11:27:49 2020 +0200
@@ -0,0 +1,24 @@
+package com.passus.st.config;
+
+import com.passus.config.schema.NodeDefinition;
+import com.passus.config.schema.NodeDefinitionCreator;
+
+import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef;
+import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef;
+import static com.passus.st.config.CommonNodeDefs.BOOLEAN_DEF;
+import static com.passus.st.config.CommonNodeDefs.INT_GREATER_THAN_ZERO_DEF;
+
+public class SocketParametersNodeDefCreator implements NodeDefinitionCreator {
+
+    @Override
+    public NodeDefinition create() {
+        return mapDef(
+                tupleDef("broadcast", BOOLEAN_DEF).setRequired(false),
+                tupleDef("keepalive", BOOLEAN_DEF).setRequired(false),
+                tupleDef("linger", INT_GREATER_THAN_ZERO_DEF).setRequired(false),
+                tupleDef("sendbuffersize", INT_GREATER_THAN_ZERO_DEF).setRequired(false),
+                tupleDef("receivebuffersize", INT_GREATER_THAN_ZERO_DEF).setRequired(false)
+        ).setTransformer(new SocketParametersRuleNodeTransformer());
+    }
+
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/config/SocketParametersRuleNodeTransformer.java	Wed May 20 11:27:49 2020 +0200
@@ -0,0 +1,58 @@
+package com.passus.st.config;
+
+import com.passus.config.*;
+import com.passus.config.schema.NodeTransformer;
+import com.passus.config.validation.Errors;
+import com.passus.st.emitter.SocketParameters;
+
+import java.util.List;
+
+import static com.passus.config.ConfigurationUtils.extractBoolean;
+import static com.passus.config.ConfigurationUtils.extractInteger;
+
+public class SocketParametersRuleNodeTransformer implements NodeTransformer<CNode> {
+
+    @Override
+    public CNode transform(CNode node, Errors errors, ConfigurationContext context) {
+        CMapNode mapNode = (CMapNode) node;
+
+        SocketParameters socketParams = new SocketParameters();
+        List<CTupleNode> tuples = mapNode.getChildren();
+        if (!tuples.isEmpty()) {
+            for (CTupleNode tuple : tuples) {
+                String opName = tuple.getName();
+                try {
+                    switch (opName.toLowerCase()) {
+                        case "broadcast":
+                            socketParams.setBroadcast(extractBoolean(tuple));
+                            break;
+                        case "keepalive":
+                            socketParams.setKeepalive(extractBoolean(tuple));
+                            break;
+                        case "linger":
+                            socketParams.setLinger(extractInteger(tuple));
+                            break;
+                        case "sendbuffersize":
+                            socketParams.setSendBufferSize(extractInteger(tuple));
+                            break;
+                        case "receivebuffersize":
+                            socketParams.setReceiveBufferSize(extractInteger(tuple));
+                            break;
+                        default:
+                            throw new IllegalArgumentException("Unknown parameter '" + opName + "'.");
+                    }
+                } catch (Exception ex) {
+                    throw new IllegalArgumentException(ex.getMessage(), ex);
+                }
+            }
+        }
+
+        return new CValueNode(socketParams);
+    }
+
+    @Override
+    public CNode reverseTransform(CNode node, Errors errors, ConfigurationContext context) {
+        throw new UnsupportedOperationException("Impossible."); // Predicate
+    }
+
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/SocketParameters.java	Wed May 20 11:27:49 2020 +0200
@@ -0,0 +1,143 @@
+package com.passus.st.emitter;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.StandardSocketOptions;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.NetworkChannel;
+import java.nio.channels.SocketChannel;
+
+public class SocketParameters {
+
+    private int sendBufferSize = -1;
+
+    private int receiveBufferSize = -1;
+
+    private Boolean reUseAddress = null;
+
+    private Boolean keepalive = null;
+
+    private Boolean broadcast = null;
+
+    private int linger = -1;
+
+    public int getSendBufferSize() {
+        return sendBufferSize;
+    }
+
+    public void setSendBufferSize(int sendBufferSize) {
+        this.sendBufferSize = sendBufferSize;
+    }
+
+    public int getReceiveBufferSize() {
+        return receiveBufferSize;
+    }
+
+    public void setReceiveBufferSize(int receiveBufferSize) {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    public Boolean getReUseAddress() {
+        return reUseAddress;
+    }
+
+    public void setReUseAddress(Boolean reUseAddress) {
+        this.reUseAddress = reUseAddress;
+    }
+
+    public Boolean getKeepalive() {
+        return keepalive;
+    }
+
+    public void setKeepalive(Boolean keepalive) {
+        this.keepalive = keepalive;
+    }
+
+    public Boolean getBroadcast() {
+        return broadcast;
+    }
+
+    public void setBroadcast(Boolean broadcast) {
+        this.broadcast = broadcast;
+    }
+
+    public int getLinger() {
+        return linger;
+    }
+
+    public void setLinger(int linger) {
+        this.linger = linger;
+    }
+
+    public void apply(DatagramSocket socket) throws SocketException {
+        if (sendBufferSize > 0) {
+            socket.setSendBufferSize(sendBufferSize);
+        }
+
+        if (receiveBufferSize > 0) {
+            socket.setReceiveBufferSize(receiveBufferSize);
+        }
+
+        if (reUseAddress != null) {
+            socket.setReuseAddress(reUseAddress);
+        }
+    }
+
+    public void apply(Socket socket) throws SocketException {
+        if (sendBufferSize > 0) {
+            socket.setSendBufferSize(sendBufferSize);
+        }
+
+        if (receiveBufferSize > 0) {
+            socket.setReceiveBufferSize(receiveBufferSize);
+        }
+
+        if (reUseAddress != null) {
+            socket.setReuseAddress(reUseAddress);
+        }
+
+        if (keepalive != null) {
+            socket.setKeepAlive(keepalive);
+        }
+
+        if (linger > 0) {
+            socket.setSoLinger(true, linger);
+        }
+    }
+
+    private void applyNetworkChannel(NetworkChannel channel) throws IOException {
+        if (sendBufferSize > 0) {
+            channel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
+        }
+
+        if (receiveBufferSize > 0) {
+            channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+        }
+
+        if (reUseAddress != null) {
+            channel.setOption(StandardSocketOptions.SO_REUSEADDR, reUseAddress);
+        }
+    }
+
+    public void apply(SocketChannel channel) throws IOException {
+        applyNetworkChannel(channel);
+
+        if (keepalive != null) {
+            channel.setOption(StandardSocketOptions.SO_KEEPALIVE, keepalive);
+        }
+
+        if (linger > 0) {
+            channel.setOption(StandardSocketOptions.SO_LINGER, linger);
+        }
+    }
+
+    public void apply(DatagramChannel channel) throws IOException {
+        applyNetworkChannel(channel);
+
+        if (broadcast != null) {
+            channel.setOption(StandardSocketOptions.SO_BROADCAST, broadcast);
+        }
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java	Wed May 20 09:45:26 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java	Wed May 20 11:27:49 2020 +0200
@@ -1,29 +1,24 @@
 package com.passus.st.emitter.nio;
 
-import com.passus.config.ConfigurationContext;
-import com.passus.st.client.Timeouts;
-import com.passus.st.emitter.SessionMapper;
 import com.passus.commons.Assert;
 import com.passus.commons.annotations.Plugin;
 import com.passus.commons.service.ServiceException;
 import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
 import com.passus.config.annotations.NodeDefinitionCreate;
-
-import static com.passus.config.schema.ConfigurationSchemaBuilder.*;
-
 import com.passus.config.schema.MapNodeDefinition;
-import com.passus.config.validation.LongValidator;
-import com.passus.st.emitter.Emitter;
-import com.passus.st.emitter.EmitterHandler;
-import com.passus.st.emitter.SessionInfo;
+import com.passus.st.client.Timeouts;
+import com.passus.st.config.SocketParametersNodeDefCreator;
+import com.passus.st.emitter.*;
 import com.passus.st.metric.MetricsContainer;
 import com.passus.st.plugin.PluginConstants;
-import com.passus.st.utils.PeriodValueTransformer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef;
+import static com.passus.st.config.CommonNodeDefs.INT_GREATER_THAN_ZERO_DEF;
 
 /**
  * @author Mirosław Hawrot
@@ -40,6 +35,8 @@
 
     private static final long DEFAULT_CONNECTION_TIMEOUT = 5_000;
 
+    static SocketParameters DEFAULT_SOCKET_PARAMETERS = new SocketParameters();
+
     private NioEmitterWorker[] workers;
 
     private int maxThreads = DEFAULT_NUM_THREADS;
@@ -56,6 +53,8 @@
 
     private Timeouts timeouts = DEFAULT_TIMEOUTS;
 
+    private SocketParameters socketParameters = DEFAULT_SOCKET_PARAMETERS;
+
     public NioEmitter() {
         this(NioEmitterWorkerImpl.class);
     }
@@ -86,6 +85,15 @@
         this.timeouts = timeouts;
     }
 
+    public SocketParameters getSocketParameters() {
+        return socketParameters;
+    }
+
+    public void setSocketParameters(SocketParameters socketParameters) {
+        Assert.notNull(socketParameters, "socketParameters");
+        this.socketParameters = socketParameters;
+    }
+
     public int getMaxThreads() {
         return maxThreads;
     }
@@ -132,6 +140,7 @@
     public void configure(Configuration config, ConfigurationContext context) {
         Emitter.super.configure(config, context);
         setMaxThreads(config.getInteger("threads", DEFAULT_NUM_THREADS));
+        setSocketParameters(config.get("socket", DEFAULT_SOCKET_PARAMETERS));
     }
 
     @Override
@@ -156,6 +165,7 @@
                 worker.setSessionMapper(sessionMapper);
                 worker.setCollectMetrics(collectMetrics);
                 worker.setTimeouts(timeouts);
+                worker.setSocketParameters(socketParameters);
                 workers[i] = worker;
                 worker.start();
             } catch (Exception e) {
@@ -208,14 +218,10 @@
         @Override
         public MapNodeDefinition create() {
             MapNodeDefinition def = super.create();
-            def.add("threads",
-                    valueDefInteger().addValidator(LongValidator.GREATER_ZERO)
-            ).setRequired(false);
-
-            def.add("connectionTimeout",
-                    valueDef().setTransformer(PeriodValueTransformer.INSTANCE)
-            ).setRequired(false);
-
+            def.add(
+                    tupleDef("threads", INT_GREATER_THAN_ZERO_DEF).setRequired(false),
+                    tupleDef("socket", new SocketParametersNodeDefCreator().create()).setRequired(false)
+            );
             return def;
         }
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Wed May 20 09:45:26 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Wed May 20 11:27:49 2020 +0200
@@ -2,10 +2,7 @@
 
 import com.passus.commons.Assert;
 import com.passus.st.client.Timeouts;
-import com.passus.st.emitter.EmitterHandler;
-import com.passus.st.emitter.EmitterMetric;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.emitter.SessionMapper;
+import com.passus.st.emitter.*;
 import com.passus.st.metric.MetricSource;
 import java.io.IOException;
 import java.nio.channels.SelectionKey;
@@ -13,6 +10,7 @@
 import org.apache.logging.log4j.Logger;
 
 import static com.passus.st.emitter.Emitter.DEFAULT_TIMEOUTS;
+import static com.passus.st.emitter.nio.NioEmitter.DEFAULT_SOCKET_PARAMETERS;
 
 /**
  *
@@ -34,6 +32,8 @@
 
     protected Timeouts timeouts = DEFAULT_TIMEOUTS;
 
+    protected SocketParameters socketParameters = DEFAULT_SOCKET_PARAMETERS;
+
     protected NioEmitterWorker(int index) throws IOException {
         super("NioEmitterWorker-" + index);
         this.index = index;
@@ -73,6 +73,14 @@
         return sessionMapper;
     }
 
+    public SocketParameters getSocketParameters() {
+        return socketParameters;
+    }
+
+    public void setSocketParameters(SocketParameters socketParameters) {
+        this.socketParameters = socketParameters;
+    }
+
     public void setSessionMapper(SessionMapper sessionMapper) {
         this.sessionMapper = sessionMapper;
     }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java	Wed May 20 09:45:26 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java	Wed May 20 11:27:49 2020 +0200
@@ -42,6 +42,7 @@
 
             SocketChannel channel = SocketChannel.open();
             channel.configureBlocking(false);
+            socketParameters.apply(channel);
             channel.socket().setSoTimeout((int) timeouts.getReadTimeout());
 
             SocketAddress bindAddress = connParams.getBindAddress();
@@ -94,6 +95,7 @@
 
             DatagramChannel channel = DatagramChannel.open();
             channel.configureBlocking(false);
+            socketParameters.apply(channel);
 
             SocketAddress bindAddress = connParams.getBindAddress();
             if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Wed May 20 09:45:26 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Wed May 20 11:27:49 2020 +0200
@@ -29,6 +29,8 @@
 
     final Timeouts timeouts;
 
+    final SocketParameters socketParameters;
+
     final ConnectionListener listener;
 
     SocketAddress localAddress;
@@ -43,11 +45,14 @@
 
     ByteBuff buffer = new HeapByteBuff();
 
-    public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, Timeouts timeouts, boolean collectMetrics, ConnectionListener listener) {
+    public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper,
+                      Timeouts timeouts, SocketParameters socketParameters,
+                      boolean collectMetrics, ConnectionListener listener) {
         this.sessionInfo = sessionInfo;
         this.handler = handler;
         this.sessionMapper = sessionMapper;
         this.timeouts = timeouts;
+        this.socketParameters = socketParameters;
         this.listener = listener;
         this.collectMetrics = collectMetrics;
         if (collectMetrics) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Wed May 20 09:45:26 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Wed May 20 11:27:49 2020 +0200
@@ -7,6 +7,7 @@
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.emitter.SessionMapper;
 import com.passus.st.emitter.SessionMapper.ConnectionParams;
+import com.passus.st.emitter.SocketParameters;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -35,10 +36,10 @@
 
     private DatagramPacket readPacket = new DatagramPacket(buffer, buffer.length);
 
-    public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler,
-                              SessionMapper sessionMapper, Timeouts timeouts,
+    public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper,
+                              Timeouts timeouts, SocketParameters socketParameters,
                               boolean collectMetrics, ConnectionListener listener) {
-        super(sessionInfo, handler, sessionMapper, timeouts, collectMetrics, listener);
+        super(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, listener);
     }
 
     public boolean isConnected() {
@@ -67,6 +68,8 @@
             } else {
                 socket = new DatagramSocket();
             }
+
+            socketParameters.apply(socket);
         } catch (SocketException ex) {
             doCatchException(channelContext, ex);
             return;
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java	Wed May 20 09:45:26 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java	Wed May 20 11:27:49 2020 +0200
@@ -6,6 +6,7 @@
 import com.passus.st.emitter.EmitterHandler;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.emitter.SessionMapper;
+import com.passus.st.emitter.SocketParameters;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -43,10 +44,10 @@
 
     private byte[] readBuffer = new byte[bufferSize];
 
-    public SocketConnection(SessionInfo sessionInfo, EmitterHandler handler,
-                            SessionMapper sessionMapper, Timeouts timeouts,
+    public SocketConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper,
+                            Timeouts timeouts, SocketParameters socketParameters,
                             boolean collectMetrics, ConnectionListener listener) {
-        super(sessionInfo, handler, sessionMapper, timeouts, collectMetrics, listener);
+        super(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, listener);
     }
 
     @Override
@@ -82,6 +83,8 @@
             if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
                 socket.bind(socketAddressToJdkSocket(bindAddress));
             }
+
+            socketParameters.apply(socket);
         } catch (IOException ex) {
             doCatchException(channelContext, ex);
             return;
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Wed May 20 09:45:26 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Wed May 20 11:27:49 2020 +0200
@@ -9,6 +9,7 @@
 import com.passus.net.session.Session;
 import com.passus.st.client.PerNameMetricsContainer;
 import com.passus.st.client.Timeouts;
+import com.passus.st.config.SocketParametersNodeDefCreator;
 import com.passus.st.emitter.*;
 import com.passus.st.metric.MetricsContainer;
 import com.passus.st.plugin.PluginConstants;
@@ -19,6 +20,9 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef;
+import static com.passus.st.config.CommonNodeDefs.INT_GREATER_THAN_ZERO_DEF;
+
 @NodeDefinitionCreate(SocketEmitter.SocketEmitterNodeDefCreator.class)
 @Plugin(name = SocketEmitter.TYPE, category = PluginConstants.CATEGORY_EMITTER)
 public class SocketEmitter implements Emitter {
@@ -29,6 +33,8 @@
 
     private static final int DEFAULT_NUM_THREADS = 4;
 
+    static SocketParameters DEFAULT_SOCKET_PARAMETERS = new SocketParameters();
+
     private SessionMapper sessionMapper = Emitter.DEFAULT_SESSION_MAPPER;
 
     private static final long DEFAULT_CONNECTION_TIMEOUT = 5_000;
@@ -45,6 +51,8 @@
 
     private Timeouts timeouts = DEFAULT_TIMEOUTS;
 
+    private SocketParameters socketParameters = DEFAULT_SOCKET_PARAMETERS;
+
     @Override
     public void setSessionMapper(SessionMapper sessionMapper) {
         this.sessionMapper = sessionMapper;
@@ -62,10 +70,19 @@
 
     @Override
     public void setTimeouts(Timeouts timeouts) {
-        Assert.notNull(timeouts);
+        Assert.notNull(timeouts, "timeouts");
         this.timeouts = timeouts;
     }
 
+    public SocketParameters getSocketParameters() {
+        return socketParameters;
+    }
+
+    public void setSocketParameters(SocketParameters socketParameters) {
+        Assert.notNull(socketParameters, "socketParameters");
+        this.socketParameters = socketParameters;
+    }
+
     @Override
     public boolean isCollectMetrics() {
         return collectMetrics;
@@ -99,6 +116,7 @@
     @Override
     public void configure(Configuration config, ConfigurationContext context) {
         Emitter.super.configure(config, context);
+        setSocketParameters(config.get("socket", DEFAULT_SOCKET_PARAMETERS));
     }
 
     @Override
@@ -181,9 +199,9 @@
 
             Connection connection;
             if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) {
-                connection = new SocketConnection(sessionInfo, handler, sessionMapper, timeouts, collectMetrics, listener);
+                connection = new SocketConnection(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, listener);
             } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) {
-                connection = new DatagramConnection(sessionInfo, handler, sessionMapper, timeouts, collectMetrics, listener);
+                connection = new DatagramConnection(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, listener);
             } else {
                 throw new IllegalArgumentException("Not supported transport " + sessionInfo.getTransport() + ".");
             }
@@ -198,7 +216,11 @@
 
         @Override
         public MapNodeDefinition create() {
-            return super.create();
+            MapNodeDefinition map = super.create();
+            map.add(
+                    tupleDef("socket", new SocketParametersNodeDefCreator().create()).setRequired(false)
+            );
+            return map;
         }
 
     }
--- a/stress-tester/src/test/java/com/passus/st/config/TestJobConfiguratorTest.java	Wed May 20 09:45:26 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/config/TestJobConfiguratorTest.java	Wed May 20 11:27:49 2020 +0200
@@ -32,7 +32,7 @@
 
 public class TestJobConfiguratorTest {
 
-    private Errors errors = new Errors();
+    private final Errors errors = new Errors();
 
     @BeforeClass
     public void beforeClass() {
--- a/stress-tester/src/test/java/com/passus/st/emitter/EmitterConfiguratorTest.java	Wed May 20 09:45:26 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/emitter/EmitterConfiguratorTest.java	Wed May 20 11:27:49 2020 +0200
@@ -5,6 +5,7 @@
 import com.passus.config.ConfigurationContextImpl;
 import com.passus.config.YamlConfigurationReader;
 import com.passus.config.validation.Errors;
+import com.passus.st.client.Timeouts;
 import com.passus.st.emitter.nio.NioEmitter;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
@@ -41,16 +42,27 @@
                 "emitters:\n" +
                 "    - type: nio\n" +
                 "      collectMetrics: true\n" +
-                "      sessionMapper: '2.2.2.2:80->2.2.2.2:90'\n";
+                "      sessionMapper: '2.2.2.2:80->2.2.2.2:90'\n" +
+                "      timeouts: \n" +
+                "           connect: 1\n" +
+                "           read: 2\n" +
+                "           disconnect: 3\n";
 
         ConfigurationContext context = new ConfigurationContextImpl();
         processConfig(configStr, context);
 
-        Emitter defaultEmitters = (Emitter) context.get(EMITTER_DEFAULT_EMITTER);
+        Emitter defaultEmitters = context.get(EMITTER_DEFAULT_EMITTER);
         assertTrue(context.get(EMITTER_SESSION_MAPPER) instanceof RuleBasedSessionMapper);
-        List<Emitter> emitters = (List<Emitter>) context.get(EMITTER_EMITTERS);
+        List<Emitter> emitters = context.get(EMITTER_EMITTERS);
         assertEquals(1, emitters.size());
-        assertSame(defaultEmitters, emitters.get(0));
+
+        Emitter emitter = emitters.get(0);
+        assertSame(defaultEmitters, emitter);
+
+        Timeouts timeouts = emitter.getTimeouts();
+        assertEquals(1L, timeouts.getConnectionTimeout());
+        assertEquals(2L, timeouts.getReadTimeout());
+        assertEquals(3L, timeouts.getDisconnectTimeout());
     }
 
     @Test
@@ -71,9 +83,9 @@
         ConfigurationContext context = new ConfigurationContextImpl();
         processConfig(configStr, context);
 
-        SessionMapper sessionMapper = (SessionMapper) context.get(EMITTER_SESSION_MAPPER);
+        SessionMapper sessionMapper = context.get(EMITTER_SESSION_MAPPER);
         assertTrue(sessionMapper instanceof RuleBasedSessionMapper);
-        List<Emitter> emitters = (List<Emitter>) context.get(EMITTER_EMITTERS);
+        List<Emitter> emitters = context.get(EMITTER_EMITTERS);
         Emitter emitter = emitters.get(0);
         assertSame(sessionMapper, emitter.getSessionMapper());
     }
--- a/stress-tester/src/test/resources/com/passus/st/config/test_job_config.yml	Wed May 20 09:45:26 2020 +0200
+++ b/stress-tester/src/test/resources/com/passus/st/config/test_job_config.yml	Wed May 20 11:27:49 2020 +0200
@@ -15,8 +15,6 @@
     sessionMapper: "1.1.1.1:80->2.2.2.2:90"
     emitters:
         - type: nio
-          connectionTimeout: 5s
-          #connectionAttempts: 3
           threads: 4
           collectMetrics: true
 
--- a/stress-tester/src/test/resources/com/passus/st/project/test_project_config.yml	Wed May 20 09:45:26 2020 +0200
+++ b/stress-tester/src/test/resources/com/passus/st/project/test_project_config.yml	Wed May 20 11:27:49 2020 +0200
@@ -18,8 +18,6 @@
     sessionMapper: "1.1.1.1:80->2.2.2.2:90"
     emitters:
         - type: nio
-          connectionTimeout: 5s
-          #connectionAttempts: 3
           threads: 4
           collectMetrics: true