changeset 978:d88ea87ac0a7

SocketEmitter in progress, NioEmitter improvements
author Devel 2
date Mon, 29 Jul 2019 12:54:31 +0200
parents 71d0f31b7e44
children 700a3005eaf8
files stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/Emitter.java stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioSocketChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramSocketChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java stress-tester/src/test/java/com/passus/st/client/FlowExecutorTest.java stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java
diffstat 20 files changed, 621 insertions(+), 75 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Thu Jul 25 13:34:42 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Mon Jul 29 12:54:31 2019 +0200
@@ -354,6 +354,7 @@
 
                 context.setBidirectional(flowContext.isBidirectional());
                 flowContext.channelContext(context);
+                context.setAttachment(flowContext);
                 changeFlowState(flowContext, STATE_CONNECTED);
             }
 
@@ -364,14 +365,12 @@
     @Override
     public void channelInactive(ChannelContext context) throws Exception {
         synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            if (flowContext != null) {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, "Channel inactive.");
-                }
+            FlowContext flowContext = (FlowContext) context.getAttachment();
+            if (logger.isDebugEnabled()) {
+                debug(flowContext, "Channel inactive.");
+            }
 
-                changeFlowState(flowContext, STATE_DISCONNECTED);
-            }
+            changeFlowState(flowContext, STATE_DISCONNECTED);
             lock.notifyAll();
         }
     }
@@ -379,42 +378,40 @@
     @Override
     public void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
         synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
+            FlowContext flowContext = (FlowContext) context.getAttachment();
             try {
-                if (flowContext != null) {
-                    FlowHandler client = flowContext.client();
-                    FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext);
-                    decoder.decode(data, flowContext);
-                    long now = timeGenerator.currentTimeMillis();
-                    if (flowContext.receivedStartTimestamp() == -1) {
-                        flowContext.receivedStartTimestamp(now);
+                FlowHandler client = flowContext.client();
+                FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext);
+                decoder.decode(data, flowContext);
+                long now = timeGenerator.currentTimeMillis();
+                if (flowContext.receivedStartTimestamp() == -1) {
+                    flowContext.receivedStartTimestamp(now);
+                }
+
+                if (decoder.state() == DataDecoder.STATE_ERROR) {
+                    if (logger.isDebugEnabled()) {
+                        debug(flowContext, "Decoder error. " + decoder.getLastError());
                     }
 
-                    if (decoder.state() == DataDecoder.STATE_ERROR) {
-                        if (logger.isDebugEnabled()) {
-                            debug(flowContext, "Decoder error. " + decoder.getLastError());
-                        }
+                    decoder.clear(flowContext);
+                    changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED);
+                } else if (decoder.state() == DataDecoder.STATE_FINISHED) {
+                    Object resp = decoder.getResult();
+                    Object req = null;
+                    if (flowContext.sentEvent() != null) {
+                        req = flowContext.sentEvent().getRequest();
+                    }
 
-                        decoder.clear(flowContext);
-                        changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED);
-                    } else if (decoder.state() == DataDecoder.STATE_FINISHED) {
-                        Object resp = decoder.getResult();
-                        Object req = null;
-                        if (flowContext.sentEvent() != null) {
-                            req = flowContext.sentEvent().getRequest();
+                    if (filterChain.filterInbound(req, resp, flowContext) != Filter.DENY) {
+                        try {
+                            fireResponseReceived(req, resp, flowContext);
+                        } catch (Exception e) {
+                            error(flowContext, e.getMessage(), e);
                         }
+                    }
 
-                        if (filterChain.filterInbound(req, resp, flowContext) != Filter.DENY) {
-                            try {
-                                fireResponseReceived(req, resp, flowContext);
-                            } catch (Exception e) {
-                                error(flowContext, e.getMessage(), e);
-                            }
-                        }
-
-                        decoder.clear(flowContext);
-                        changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED);
-                    }
+                    decoder.clear(flowContext);
+                    changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED);
                 }
             } catch (Exception e) {
                 if (logger.isDebugEnabled()) {
@@ -429,8 +426,8 @@
     @Override
     public void dataWriteStart(ChannelContext context) {
         synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            if (flowContext != null && flowContext.sentEvent() != null) {
+            FlowContext flowContext = (FlowContext) context.getAttachment();
+            if (flowContext.sentEvent() != null) {
                 long now = timeGenerator.currentTimeMillis();
                 flowContext.sendStartTimestamp(now);
                 flowContext.client().onDataWriteStart(flowContext);
@@ -441,8 +438,8 @@
     @Override
     public void dataWritten(ChannelContext context) throws Exception {
         synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            if (flowContext != null && flowContext.sentEvent() != null) {
+            FlowContext flowContext = (FlowContext) context.getAttachment();
+            if (flowContext.sentEvent() != null) {
                 long now = timeGenerator.currentTimeMillis();
                 if (collectMetric) {
                     synchronized (metric) {
@@ -467,11 +464,8 @@
         }
 
         synchronized (lock) {
-            FlowContext flowContext = flowContext(context);
-            if (flowContext != null) {
-                changeFlowState(flowContext, FlowContext.STATE_ERROR);
-            }
-
+            FlowContext flowContext = (FlowContext) context.getAttachment();
+            changeFlowState(flowContext, FlowContext.STATE_ERROR);
             lock.notifyAll();
         }
     }
--- a/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java	Thu Jul 25 13:34:42 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java	Mon Jul 29 12:54:31 2019 +0200
@@ -8,7 +8,7 @@
 /**
  * @author Mirosław Hawrot
  */
-public interface ChannelContext {
+public interface ChannelContext<T> {
 
     boolean isBidirectional();
 
@@ -42,4 +42,8 @@
 
     SessionInfo getSessionInfo();
 
+    void setAttachment(T attachment);
+
+    T getAttachment();
+
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/Emitter.java	Thu Jul 25 13:34:42 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/Emitter.java	Mon Jul 29 12:54:31 2019 +0200
@@ -20,6 +20,8 @@
 
     public static final SessionMapper DEFAULT_SESSION_MAPPER = new PassThroughSessionMapper();
 
+    public static final boolean DEFAULT_COLLECT_METRICS = false;
+
     public void setSessionMapper(SessionMapper mapper);
 
     public SessionMapper getSessionMapper();
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Thu Jul 25 13:34:42 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Mon Jul 29 12:54:31 2019 +0200
@@ -14,7 +14,7 @@
 /**
  * @author Mirosław Hawrot
  */
-public class SessionInfo {
+public final class SessionInfo {
 
     private static final int DEFAULT_TRANSPORT = Session.PROTOCOL_TCP;
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Thu Jul 25 13:34:42 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Mon Jul 29 12:54:31 2019 +0200
@@ -258,14 +258,19 @@
 
     protected void doWrite(SelectionKey key) {
         KeyContext keyContext = (KeyContext) key.attachment();
+
+        NioChannelContext channelContext = keyContext.channelContext;
+        Queue<ByteBuffer> queue = channelContext.dataQueue();
+        if(queue.isEmpty()) {
+            return;
+        }
+
+        keyContext.handler.dataWriteStart(channelContext);
         if (logger.isDebugEnabled()) {
             logger.debug("Writing ({} -> {}).",
                     keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
         }
 
-        NioChannelContext channelContext = keyContext.channelContext;
-        keyContext.handler.dataWriteStart(channelContext);
-        Queue<ByteBuffer> queue = channelContext.dataQueue();
         int written = 0;
         try {
             ByteBuffer buffer;
@@ -320,7 +325,6 @@
         }
 
         ByteBuffer buffer = keyContext.buffer;
-
         buffer.clear();
 
         NioChannelContext channelContext = keyContext.channelContext;
@@ -371,18 +375,16 @@
         int selected = 0;
         working = true;
         while (working) {
-            if (!tasks.isEmpty()) {
-                Task task;
-                while ((task = tasks.poll()) != null) {
-                    if (task.code == Task.CLOSE) {
-                        doClose(((CloseTask) task).key);
-                    } else if (task.code == Task.CONNECT) {
-                        ConnectTask taskConn = (ConnectTask) task;
-                        doConnect(taskConn.sessionInfo, taskConn.handler);
-                    } else if (task.code == Task.FLUSH) {
-                        FlushTask flushTask = (FlushTask) task;
-                        setOpWrite(flushTask.key);
-                    }
+            Task task;
+            while ((task = tasks.poll()) != null) {
+                if (task.code == Task.CLOSE) {
+                    doClose(((CloseTask) task).key);
+                } else if (task.code == Task.CONNECT) {
+                    ConnectTask taskConn = (ConnectTask) task;
+                    doConnect(taskConn.sessionInfo, taskConn.handler);
+                } else if (task.code == Task.FLUSH) {
+                    FlushTask flushTask = (FlushTask) task;
+                    setOpWrite(flushTask.key);
                 }
             }
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java	Thu Jul 25 13:34:42 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java	Mon Jul 29 12:54:31 2019 +0200
@@ -11,7 +11,7 @@
 import java.util.LinkedList;
 import java.util.Queue;
 
-public abstract class NioChannelContext<T> implements ChannelContext {
+public abstract class NioChannelContext<T, K> implements ChannelContext<K> {
 
     protected final NioEmitterWorker worker;
 
@@ -29,6 +29,8 @@
 
     protected SelectionKey key;
 
+    private K attachment;
+
     public NioChannelContext(NioEmitterWorker worker, T channel, SocketAddress remoteAddress, SessionInfo sessionInfo) {
         this.worker = worker;
         this.channel = channel;
@@ -91,5 +93,12 @@
         return sessionInfo;
     }
 
+    @Override
+    public K getAttachment() {
+        return attachment;
+    }
 
+    public void setAttachment(K attachment) {
+        this.attachment = attachment;
+    }
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java	Thu Jul 25 13:34:42 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java	Mon Jul 29 12:54:31 2019 +0200
@@ -16,7 +16,7 @@
 /**
  * @author Mirosław Hawrot
  */
-public class NioChannelContext2 implements ChannelContext {
+public class NioChannelContext2<K> implements ChannelContext<K> {
 
     private final NioEmitterWorker2 worker;
 
@@ -34,6 +34,8 @@
 
     private SelectionKey key;
 
+    private K attachment;
+
     /**
      * Usunac
      */
@@ -122,4 +124,13 @@
         return sessionInfo;
     }
 
+    @Override
+    public K getAttachment() {
+        return attachment;
+    }
+
+    @Override
+    public void setAttachment(K attachment) {
+        this.attachment = attachment;
+    }
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramChannelContext.java	Thu Jul 25 13:34:42 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramChannelContext.java	Mon Jul 29 12:54:31 2019 +0200
@@ -8,7 +8,7 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.DatagramChannel;
 
-public class NioDatagramChannelContext extends NioChannelContext<DatagramChannel> {
+public class NioDatagramChannelContext<T> extends NioChannelContext<DatagramChannel, T> {
 
     public NioDatagramChannelContext(NioEmitterWorker worker, DatagramChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) {
         super(worker, channel, remoteAddress, sessionInfo);
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java	Thu Jul 25 13:34:42 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java	Mon Jul 29 12:54:31 2019 +0200
@@ -39,8 +39,6 @@
 
     private static final long DEFAULT_CONNECTION_TIMEOUT = 5_000;
 
-    private static final boolean DEFAULT_COLLECT_METRICS = false;
-
     private NioEmitterWorker[] workers;
 
     private int maxThreads = DEFAULT_NUM_THREADS;
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Thu Jul 25 13:34:42 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Mon Jul 29 12:54:31 2019 +0200
@@ -21,7 +21,7 @@
 
     private final int index;
 
-    protected long selectTimeout = 100;
+    protected long selectTimeout = 1;
 
     protected SessionMapper sessionMapper;
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java	Thu Jul 25 13:34:42 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java	Mon Jul 29 12:54:31 2019 +0200
@@ -36,6 +36,10 @@
                 return;
             }
 
+            if (logger.isDebugEnabled()) {
+                logger.debug("Registering TCP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
+            }
+
             SocketChannel channel = SocketChannel.open();
             channel.configureBlocking(false);
 
@@ -84,7 +88,7 @@
             }
 
             if (logger.isDebugEnabled()) {
-                logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
+                logger.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
             }
 
             DatagramChannel channel = DatagramChannel.open();
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioSocketChannelContext.java	Thu Jul 25 13:34:42 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioSocketChannelContext.java	Mon Jul 29 12:54:31 2019 +0200
@@ -11,7 +11,7 @@
 /**
  * @author Mirosław Hawrot
  */
-public class NioSocketChannelContext extends NioChannelContext<SocketChannel> {
+public class NioSocketChannelContext<T> extends NioChannelContext<SocketChannel, T> {
 
     public NioSocketChannelContext(NioEmitterWorker worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) {
         super(worker, channel, remoteAddress, sessionInfo);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java	Mon Jul 29 12:54:31 2019 +0200
@@ -0,0 +1,90 @@
+package com.passus.st.emitter.socket;
+
+import com.passus.data.ByteBuff;
+import com.passus.net.SocketAddress;
+import com.passus.st.emitter.ChannelContext;
+import com.passus.st.emitter.SessionInfo;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+
+public abstract class AbstractChannelContext<K> implements ChannelContext<K> {
+
+    private final Connection connection;
+
+    private boolean bidirectional;
+
+    private final Queue<byte[]> dataQueue;
+
+    private K attachment;
+
+    public AbstractChannelContext(Connection connection) {
+        this.connection = connection;
+        this.dataQueue = new LinkedList<>();
+    }
+
+    @Override
+    public boolean isBidirectional() {
+        return bidirectional;
+    }
+
+    @Override
+    public void setBidirectional(boolean bidirectional) {
+        this.bidirectional = bidirectional;
+    }
+
+    @Override
+    public SocketAddress getLocalAddress() {
+        return connection.localAddress;
+    }
+
+    @Override
+    public SocketAddress getRemoteAddress() {
+        return connection.remoteAddress;
+    }
+
+    @Override
+    public SessionInfo getSessionInfo() {
+        return connection.sessionInfo;
+    }
+
+    public Queue<byte[]> dataQueue() {
+        return dataQueue;
+    }
+
+    @Override
+    public void write(byte[] data, int offset, int length) throws IOException {
+        byte[] out = new byte[length];
+        System.arraycopy(data, offset, out, 0, length);
+        dataQueue.add(out);
+    }
+
+    @Override
+    public void write(ByteBuff data) throws IOException {
+        dataQueue.add(data.toArray());
+    }
+
+    public void flush() throws IOException {
+        synchronized (connection) {
+            connection.flag = Connection.FLAG_FLUSH;
+            connection.notify();
+        }
+    }
+
+    public void close() throws IOException {
+        synchronized (connection) {
+            connection.flag = Connection.FLAG_CLOSE;
+            connection.notify();
+        }
+    }
+
+    @Override
+    public K getAttachment() {
+        return attachment;
+    }
+
+    public void setAttachment(K attachment) {
+        this.attachment = attachment;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Mon Jul 29 12:54:31 2019 +0200
@@ -0,0 +1,134 @@
+package com.passus.st.emitter.socket;
+
+import com.passus.net.SocketAddress;
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.emitter.SessionMapper;
+import com.passus.st.emitter.SessionMapper.ConnectionParams;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Queue;
+
+public abstract class Connection extends Thread {
+
+    protected final Logger logger = LogManager.getLogger(getClass());
+
+    static final int FLAG_CONNECT = 1;
+    static final int FLAG_FLUSH = 2;
+    static final int FLAG_READ = 3;
+    static final int FLAG_CLOSE = 4;
+
+    private boolean working = true;
+
+    int flag = FLAG_CONNECT;
+
+    final SessionInfo sessionInfo;
+
+    final EmitterHandler handler;
+
+    final SessionMapper sessionMapper;
+
+    SocketAddress localAddress;
+
+    SocketAddress remoteAddress;
+
+    AbstractChannelContext channelContext;
+
+    public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper) {
+        this.sessionInfo = sessionInfo;
+        this.handler = handler;
+        this.sessionMapper = sessionMapper;
+    }
+
+    protected final ConnectionParams getConnParams(SessionInfo sessionInfo, EmitterHandler handler) {
+        ConnectionParams connParams = sessionMapper.map(sessionInfo);
+        if (connParams == null) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Unable to map session '{}'.", sessionInfo);
+            }
+
+           /* if (collectMetrics) {
+                synchronized (metric) {
+                    metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID);
+                }
+            }*/
+
+            try {
+                handler.sessionInvalidated(sessionInfo);
+            } catch (Exception e) {
+                logger.debug(e.getMessage(), e);
+            }
+
+            return null;
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
+        }
+
+        return connParams;
+    }
+
+    protected void doCatchException(AbstractChannelContext channelContext, Throwable cause) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Error occurred. " + cause.getMessage(), cause);
+        }
+
+        /*if (collectMetrics) {
+                synchronized (metric) {
+                    metric.errorCaught(cause);
+                }
+        }*/
+
+        try {
+            handler.errorOccurred(channelContext, cause);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+    }
+
+    public abstract void connect();
+
+    public abstract void close();
+
+    private void write(byte[] data) {
+        write(data, 0, data.length);
+    }
+
+    public abstract void write(byte[] data, int offset, int length);
+
+    @Override
+    public void run() {
+        connect();
+        while (working) {
+            synchronized (this) {
+                if (flag != 0) {
+                    try {
+                        wait(1);
+                    } catch (InterruptedException ignore) {
+                    }
+                }
+
+                switch (flag) {
+                    case FLAG_FLUSH:
+                        Queue<byte[]> queue = channelContext.dataQueue();
+                        if (!queue.isEmpty()) {
+                            byte[] data;
+                            while ((data = queue.poll()) != null) {
+                                write(data);
+                            }
+                        }
+
+                        break;
+                    case FLAG_READ:
+                        break;
+                    case FLAG_CLOSE:
+                        close();
+                        break;
+
+                }
+            }
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Mon Jul 29 12:54:31 2019 +0200
@@ -0,0 +1,118 @@
+package com.passus.st.emitter.socket;
+
+import com.passus.net.SocketAddress;
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.emitter.SessionMapper;
+import com.passus.st.emitter.SessionMapper.ConnectionParams;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketException;
+
+import static com.passus.net.utils.AddressUtils.jdkSocketToSocketAddress;
+import static com.passus.net.utils.AddressUtils.socketAddressToJdkSocket;
+import static com.passus.st.emitter.SessionMapper.ANY_SOCKET;
+
+public class DatagramConnection extends Connection {
+
+    private final Logger logger = LogManager.getLogger(DatagramConnection.class);
+
+    private DatagramSocket socket;
+
+    private java.net.SocketAddress remoteSocket;
+
+    public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper) {
+        super(sessionInfo, handler, sessionMapper);
+    }
+
+    public void connect() {
+        ConnectionParams connParams = getConnParams(sessionInfo, handler);
+        if (connParams == null) {
+            return;
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
+        }
+
+        try {
+            SocketAddress bindAddress = connParams.getBindAddress();
+            if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
+                socket = new DatagramSocket(socketAddressToJdkSocket(bindAddress));
+            } else {
+                socket = new DatagramSocket();
+            }
+        } catch (SocketException ex) {
+            doCatchException(channelContext, ex);
+            return;
+        }
+
+        remoteAddress = connParams.getRemoteAddress();
+        if (remoteAddress == null) {
+            remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
+        }
+
+        java.net.SocketAddress localSocketAddress = socket.getLocalSocketAddress();
+        remoteSocket = socketAddressToJdkSocket(remoteAddress);
+        localAddress = jdkSocketToSocketAddress(localSocketAddress);
+
+        channelContext = new DatagramSocketChannelContext(this);
+        try {
+            handler.channelRegistered(channelContext);
+        } catch (Exception ex) {
+            doCatchException(channelContext, ex);
+        }
+
+        try {
+            socket.connect(remoteSocket);
+            handler.channelActive(channelContext);
+        } catch (Exception ex) {
+            doCatchException(channelContext, ex);
+        }
+    }
+
+    public void close() {
+        try {
+            socket.disconnect();
+        } catch (Exception ignore) {
+
+        }
+
+        try {
+            socket.close();
+        } catch (Exception ignore) {
+
+        }
+    }
+
+    public void write(byte[] data, int offset, int length) {
+        try {
+            handler.dataWriteStart(channelContext);
+            if (logger.isDebugEnabled()) {
+                logger.debug("Writing ({} -> {}).", channelContext.getLocalAddress(), channelContext.getRemoteAddress());
+            }
+
+            DatagramPacket packet = new DatagramPacket(data, offset, data.length, remoteSocket);
+            socket.send(packet);
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Written {}B ({} -> {})", length,
+                        channelContext.getLocalAddress(), channelContext.getRemoteAddress());
+            }
+
+            try {
+                handler.dataWritten(channelContext);
+                logger.debug("Write handled.");
+            } catch (Exception e) {
+                logger.debug(e.getMessage(), e);
+            }
+        } catch (Exception ex) {
+            doCatchException(channelContext, ex);
+        }
+    }
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramSocketChannelContext.java	Mon Jul 29 12:54:31 2019 +0200
@@ -0,0 +1,20 @@
+package com.passus.st.emitter.socket;
+
+public class DatagramSocketChannelContext<K> extends AbstractChannelContext<K> {
+
+    public DatagramSocketChannelContext(Connection connection) {
+        super(connection);
+    }
+
+    @Override
+    public boolean isConnected() {
+        return true;
+    }
+
+    @Override
+    public boolean isConnectionPending() {
+        return false;
+    }
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Mon Jul 29 12:54:31 2019 +0200
@@ -0,0 +1,125 @@
+package com.passus.st.emitter.socket;
+
+import com.passus.commons.Assert;
+import com.passus.commons.annotations.Plugin;
+import com.passus.net.session.Session;
+import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.emitter.SessionMapper;
+import com.passus.st.metric.MetricsContainer;
+import com.passus.st.plugin.PluginConstants;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+@Plugin(name = SocketEmitter.TYPE, category = PluginConstants.CATEGORY_EMITTER)
+public class SocketEmitter implements Emitter {
+
+    private static final Logger LOGGER = LogManager.getLogger(SocketEmitter.class);
+
+    public static final String TYPE = "socket";
+
+    private static final int DEFAULT_NUM_THREADS = 4;
+
+    private SessionMapper sessionMapper = Emitter.DEFAULT_SESSION_MAPPER;
+
+    private static final long DEFAULT_CONNECTION_TIMEOUT = 5_000;
+
+    private boolean collectMetrics = DEFAULT_COLLECT_METRICS;
+
+    private Map<SessionInfo, Connection> connections = new HashMap<>();
+
+    private int maxThreads = DEFAULT_NUM_THREADS;
+
+    private long connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
+
+    private boolean started;
+
+    @Override
+    public void setSessionMapper(SessionMapper sessionMapper) {
+        Assert.notNull(sessionMapper, "sessionMapper");
+        this.sessionMapper = sessionMapper;
+    }
+
+    @Override
+    public SessionMapper getSessionMapper() {
+        return sessionMapper;
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        this.collectMetrics = collectMetrics;
+    }
+
+    @Override
+    public boolean isStarted() {
+        return started;
+    }
+
+    @Override
+    public void start() {
+        synchronized (this) {
+            if (started) {
+                return;
+            }
+
+            if (sessionMapper == null) {
+                sessionMapper = Emitter.DEFAULT_SESSION_MAPPER;
+            }
+
+            started = true;
+        }
+    }
+
+    @Override
+    public void stop() {
+        synchronized (this) {
+            if (!started) {
+                return;
+            }
+
+            started = false;
+        }
+    }
+
+    private void checkStarted() {
+        if (!started) {
+            throw new IllegalStateException("NioEmitter is not started.");
+        }
+    }
+
+
+    @Override
+    public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException {
+        synchronized (this) {
+            checkStarted();
+            if (connections.containsKey(sessionInfo)) {
+                throw new IOException("Connection established already.");
+            }
+
+            if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) {
+                throw new UnsupportedOperationException("Not implemented yet.");
+            } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) {
+                DatagramConnection connection = new DatagramConnection(sessionInfo, handler, sessionMapper);
+                connection.start();
+                connections.put(sessionInfo, connection);
+            }
+        }
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+
+    }
+
+
+}
--- a/stress-tester/src/test/java/com/passus/st/client/FlowExecutorTest.java	Thu Jul 25 13:34:42 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/FlowExecutorTest.java	Mon Jul 29 12:54:31 2019 +0200
@@ -2,6 +2,7 @@
 
 import com.passus.commons.service.ServiceUtils;
 import com.passus.st.AbstractWireMockTest;
+import com.passus.st.Log4jConfigurationFactory;
 import com.passus.st.emitter.RuleBasedSessionMapper;
 import com.passus.st.emitter.nio.NioEmitter;
 import com.passus.st.utils.EventUtils;
@@ -38,6 +39,7 @@
 
     @Test
     public void testHandle_HTTP() throws Exception {
+        Log4jConfigurationFactory.enableFactory("debug");
         Map<String, Object> props = new HashMap<>();
         props.put("allowPartialSession", true);
         props.put("ports", 4214);
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Thu Jul 25 13:34:42 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Mon Jul 29 12:54:31 2019 +0200
@@ -131,6 +131,8 @@
 
         protected boolean bidirectional = true;
 
+        private Object attachment;
+
         public LocalChannelContext(LocalEmitter emitter, EmitterHandler handler, SocketAddress remoteAddress, SessionInfo sessionInfo) {
             this.emitter = emitter;
             this.handler = handler;
@@ -198,6 +200,15 @@
             return sessionInfo;
         }
 
+        @Override
+        public Object getAttachment() {
+            return attachment;
+        }
+
+        @Override
+        public void setAttachment(Object attachment) {
+            this.attachment = attachment;
+        }
     }
 
     private SynchFlowWorker createWorker() {
--- a/stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java	Thu Jul 25 13:34:42 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java	Mon Jul 29 12:54:31 2019 +0200
@@ -10,6 +10,7 @@
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
+import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -75,7 +76,7 @@
         }
 
         try {
-            DatagramSocket serverSocket = new DatagramSocket(port, InetAddress.getByName(host));
+            serverSocket = new DatagramSocket(port, InetAddress.getByName(host));
             LOGGER.debug("Datagram socket created.");
             serverThread = new ServerThread(serverSocket);
             serverThread.start();
@@ -91,11 +92,16 @@
             return;
         }
 
+        try {
+            serverSocket.close();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
         serverThread.working = false;
-        serverSocket.disconnect();
         serverThread.interrupt();
         try {
-            serverThread.join(5_000);
+            serverThread.join();
         } catch (InterruptedException ignore) {
 
         }
@@ -120,6 +126,10 @@
             while (working) {
                 DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                 try {
+                    if (serverSocket.isClosed()) {
+                        break;
+                    }
+
                     serverSocket.receive(packet);
                     if (packet.getLength() > 0) {
                         if (decoder != null) {
@@ -142,6 +152,16 @@
                             }
                         }
                     }
+                } catch (SocketException e) {
+                    if(e.getMessage().contains("socket closed")) {
+                        break;
+                    }
+
+                    if (listener != null) {
+                        listener.onError(e);
+                    }
+
+                    LOGGER.error(e.getMessage(), e);
                 } catch (IOException e) {
                     if (listener != null) {
                         listener.onError(e);
@@ -150,6 +170,8 @@
                     LOGGER.error(e.getMessage(), e);
                 }
             }
+
+            LOGGER.debug("Server thread stopped.");
         }
     }