changeset 1001:7402a22cba53

UnidirectionalUdpPcapEmitter in progress
author Devel 2
date Fri, 04 Oct 2019 12:40:06 +0200
parents b104ead5034e
children 43f0378228df
files stress-tester/src/main/java/com/passus/st/emitter/EmitterUtils.java stress-tester/src/main/java/com/passus/st/emitter/StatelessTasks.java stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalTasks.java stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitter.java stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java stress-tester/src/test/java/com/passus/st/client/TestClientHandler.java stress-tester/src/test/java/com/passus/st/emitter/RuleBasedSessionMapperTest.java stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorkerTest.java stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitterTest.java stress-tester/src/test/java/com/passus/st/utils/NioServer.java stress-tester/src/test/java/com/passus/st/utils/server/DatagramServerContext.java stress-tester/src/test/java/com/passus/st/utils/server/ServerContext.java
diffstat 17 files changed, 1016 insertions(+), 138 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/EmitterUtils.java	Fri Oct 04 12:40:06 2019 +0200
@@ -0,0 +1,152 @@
+package com.passus.st.emitter;
+
+import com.passus.net.Ip4Address;
+import com.passus.net.IpAddress;
+import com.passus.net.IpSubnet;
+import com.passus.net.utils.AddressUtils;
+import com.passus.st.emitter.SessionMapper.ConnectionParams;
+import com.passus.st.emitter.socket.AbstractChannelContext;
+import org.apache.logging.log4j.Logger;
+
+import java.net.InetAddress;
+import java.net.InterfaceAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+import java.util.List;
+
+import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID;
+
+public class EmitterUtils {
+
+    public static final String LOOPBACK_DEVICE_NAME = "lo";
+
+    private EmitterUtils() {
+    }
+
+    public static final ConnectionParams getConnectionParams(SessionInfo sessionInfo, EmitterHandler handler,
+                                                             SessionMapper sessionMapper,
+                                                             boolean collectMetrics, EmitterMetric metric,
+                                                             Logger logger) {
+        ConnectionParams connParams = sessionMapper.map(sessionInfo);
+        if (connParams == null) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Unable to map session '{}'.", sessionInfo);
+            }
+
+            if (collectMetrics) {
+                synchronized (metric) {
+                    metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID);
+                }
+            }
+
+            try {
+                handler.sessionInvalidated(sessionInfo);
+            } catch (Exception e) {
+                logger.debug(e.getMessage(), e);
+            }
+
+            return null;
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
+        }
+
+        return connParams;
+    }
+
+    public static void doCatchException(ChannelContext channelContext, Throwable cause,
+                                        EmitterHandler handler,
+                                        boolean collectMetrics, EmitterMetric metric,
+                                        Logger logger) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Error occurred. " + cause.getMessage(), cause);
+        }
+
+        if (collectMetrics) {
+            synchronized (metric) {
+                metric.errorCaught(cause);
+            }
+        }
+
+        try {
+            handler.errorOccurred(channelContext, cause);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+    }
+
+    public static NetworkInterface findFirstUpNetworkInterface() throws SocketException {
+        Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
+        while (nets.hasMoreElements()) {
+            NetworkInterface ni = nets.nextElement();
+            Enumeration<InetAddress> addrs = ni.getInetAddresses();
+            while (addrs.hasMoreElements()) {
+                InetAddress addr = addrs.nextElement();
+                if (addr.isLoopbackAddress() || addr.isLinkLocalAddress()) {
+                    break;
+                }
+
+
+                return ni;
+            }
+        }
+
+        return null;
+    }
+
+    public static NetworkInterface findNetworkInterface(String address) throws SocketException {
+        return findNetworkInterface(IpAddress.parse(address));
+    }
+
+    public static NetworkInterface findNetworkInterface(IpAddress address) throws SocketException {
+        Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
+        while (nets.hasMoreElements()) {
+            NetworkInterface ni = nets.nextElement();
+            List<InterfaceAddress> addrs = ni.getInterfaceAddresses();
+            for (InterfaceAddress ifcAddr : addrs) {
+                IpAddress nAddr = AddressUtils.getIpAddress(ifcAddr.getAddress().getAddress());
+                if (nAddr.getVersion() == IpAddress.IP4) {
+                    IpSubnet subnet = IpSubnet.fromCidr(nAddr.getAddress()[0], ifcAddr.getNetworkPrefixLength());
+                    if (subnet.inRange((Ip4Address) address)) {
+                        return ni;
+                    }
+                }
+            }
+        }
+
+        return null;
+    }
+
+    public static void displayInterfaceInformation(NetworkInterface ni) throws SocketException {
+        System.out.printf("Display name: %s\n", ni.getDisplayName());
+        System.out.printf("Name: %s\n", ni.getName());
+        List<InterfaceAddress> ifcAddresses = ni.getInterfaceAddresses();
+        for (InterfaceAddress ifcAddress : ifcAddresses) {
+            if (ifcAddress.getAddress().isLinkLocalAddress()) {
+                continue;
+            }
+
+            System.out.printf("IfcAddress: %s\n", ifcAddress);
+        }
+
+        System.out.printf("\n");
+    }
+
+    public static void main(String[] args) throws SocketException {
+        NetworkInterface ni = findNetworkInterface("192.168.2.20");
+        if (ni != null) {
+            displayInterfaceInformation(ni);
+        } else {
+            System.out.println("Not found.");
+        }
+
+        ni = findFirstUpNetworkInterface();
+        if (ni != null) {
+            displayInterfaceInformation(ni);
+        } else {
+            System.out.println("Not found.");
+        }
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/emitter/StatelessTasks.java	Thu Oct 03 14:49:05 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/StatelessTasks.java	Fri Oct 04 12:40:06 2019 +0200
@@ -2,21 +2,6 @@
 
 public class StatelessTasks {
 
-    protected static class Task {
-
-        public static final int CONNECT = 1;
-        public static final int READ = 2;
-        public static final int FLUSH = 3;
-        public static final int CLOSE = 4;
-
-        final int code;
-
-        public Task(int code) {
-            this.code = code;
-        }
-
-    }
-
     public static final Task CONNECT_TASK = new Task(Task.CONNECT);
     public static final Task READ_TASK = new Task(Task.READ);
     public static final Task FLUSH_TASK = new Task(Task.FLUSH);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java	Fri Oct 04 12:40:06 2019 +0200
@@ -0,0 +1,134 @@
+package com.passus.st.emitter.pcap;
+
+import com.passus.data.ByteBuff;
+import com.passus.net.MACAddress;
+import com.passus.net.SocketAddress;
+import com.passus.st.emitter.ChannelContext;
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.emitter.pcap.UnidirectionalTasks.CloseTask;
+import com.passus.st.emitter.pcap.UnidirectionalTasks.FlushTask;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+
+public class UnidirectionalPcapChannelContext<K> implements ChannelContext<K> {
+
+    private final UnidirectionalPcapWorker worker;
+
+    private final Queue<byte[]> dataQueue;
+
+    private K attachment;
+
+    private final EmitterHandler handler;
+
+    private SessionInfo sessionInfo;
+
+    private final SocketAddress localAddress;
+
+    private final SocketAddress remoteAddress;
+
+    private final MACAddress localHardwareAddress;
+
+    private final MACAddress remoteHardwareAddress;
+
+    public UnidirectionalPcapChannelContext(UnidirectionalPcapWorker worker,
+                                            EmitterHandler handler,
+                                            SessionInfo sessionInfo,
+                                            SocketAddress localAddress, SocketAddress remoteAddress,
+                                            MACAddress localHardwareAddress, MACAddress remoteHardwareAddress) {
+        this.worker = worker;
+        this.handler = handler;
+        this.dataQueue = new LinkedList<>();
+        this.sessionInfo = sessionInfo;
+        this.localAddress = localAddress;
+        this.remoteAddress = remoteAddress;
+        this.localHardwareAddress = localHardwareAddress;
+        this.remoteHardwareAddress = remoteHardwareAddress;
+    }
+
+    @Override
+    public SocketAddress getLocalAddress() {
+        return localAddress;
+    }
+
+    @Override
+    public SocketAddress getRemoteAddress() {
+        return remoteAddress;
+    }
+
+    @Override
+    public SessionInfo getSessionInfo() {
+        return sessionInfo;
+    }
+
+    public EmitterHandler getHandler() {
+        return handler;
+    }
+
+    public MACAddress getLocalHardwareAddress() {
+        return localHardwareAddress;
+    }
+
+    public MACAddress getRemoteHardwareAddress() {
+        return remoteHardwareAddress;
+    }
+
+    @Override
+    public void setAttachment(K attachment) {
+        this.attachment = attachment;
+    }
+
+    @Override
+    public K getAttachment() {
+        return attachment;
+    }
+
+    @Override
+    public boolean isBidirectional() {
+        return false;
+    }
+
+    @Override
+    public void setBidirectional(boolean bidirectional) {
+        if (bidirectional) {
+            throw new IllegalArgumentException("Bidirectional communication not supported.");
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        return true;
+    }
+
+    @Override
+    public boolean isConnectionPending() {
+        return false;
+    }
+
+
+    @Override
+    public void write(byte[] data, int offset, int length) throws IOException {
+        byte[] out = new byte[length];
+        System.arraycopy(data, offset, out, 0, length);
+        dataQueue.add(out);
+    }
+
+    @Override
+    public void write(ByteBuff data) throws IOException {
+        dataQueue.add(data.toArray());
+    }
+
+    @Override
+    public void flush() throws IOException {
+        worker.flush(sessionInfo);
+    }
+
+    @Override
+    public void close() throws IOException {
+        worker.close(sessionInfo);
+    }
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java	Fri Oct 04 12:40:06 2019 +0200
@@ -0,0 +1,270 @@
+package com.passus.st.emitter.pcap;
+
+import com.passus.commons.Assert;
+import com.passus.net.IpAddress;
+import com.passus.net.MACAddress;
+import com.passus.net.SocketAddress;
+import com.passus.st.emitter.*;
+import com.passus.st.emitter.SessionMapper.ConnectionParams;
+import com.passus.st.emitter.pcap.UnidirectionalTasks.CloseTask;
+import com.passus.st.emitter.pcap.UnidirectionalTasks.ConnectTask;
+import com.passus.st.emitter.pcap.UnidirectionalTasks.FlushTask;
+import com.passus.st.emitter.pcap.UnidirectionalTasks.UnidirectionalTask;
+import com.passus.st.metric.MetricSource;
+import com.passus.st.metric.MetricsContainer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static com.passus.net.utils.AddressUtils.jdkInetAddresToIpAddress;
+import static com.passus.st.emitter.EmitterUtils.getConnectionParams;
+import static com.passus.st.emitter.SessionMapper.ANY_SOCKET;
+
+class UnidirectionalPcapWorker extends Thread implements MetricSource {
+
+    protected final Logger LOGGER = LogManager.getLogger(UnidirectionalPcapWorker.class);
+
+    boolean working = true;
+
+    protected final BlockingQueue<UnidirectionalTask> tasks = new LinkedBlockingQueue<>();
+
+    private Map<SessionInfo, UnidirectionalPcapChannelContext> sessions = new HashMap<>();
+
+    private SessionMapper sessionMapper;
+
+    private volatile boolean collectMetrics = false;
+
+    private EmitterMetric metric;
+
+    private PortPool portPool = new PortPoolImpl();
+
+    private MACAddressResolver macResolver;
+
+    public PortPool getPortPool() {
+        return portPool;
+    }
+
+    public void setPortPool(PortPool portPool) {
+        Assert.notNull(portPool, "portPool");
+        this.portPool = portPool;
+    }
+
+    public MACAddressResolver getMacResolver() {
+        return macResolver;
+    }
+
+    public void setMacResolver(MACAddressResolver macResolver) {
+        Assert.notNull(macResolver, "macResolver");
+        this.macResolver = macResolver;
+    }
+
+    public SessionMapper getSessionMapper() {
+        return sessionMapper;
+    }
+
+    public void setSessionMapper(SessionMapper sessionMapper) {
+        Assert.notNull(sessionMapper, "sessionMapper");
+        this.sessionMapper = sessionMapper;
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        this.collectMetrics = collectMetrics;
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+        if (collectMetrics) {
+            container.update(System.currentTimeMillis(), metric);
+            metric.reset();
+        }
+    }
+
+    private void doCatchException(ChannelContext channelContext, EmitterHandler handler, Throwable cause) {
+        EmitterUtils.doCatchException(channelContext, cause, handler, collectMetrics, metric, LOGGER);
+    }
+
+    private NetworkInterface findFirstUpInterface() throws IOException {
+        NetworkInterface ni = EmitterUtils.findFirstUpNetworkInterface();
+        if (ni == null) {
+            throw new IOException("Unable to find network interface.");
+        }
+
+        return ni;
+    }
+
+    private NetworkInterface findInterface(IpAddress address) throws IOException {
+        NetworkInterface ni = EmitterUtils.findNetworkInterface(address);
+        if (ni == null) {
+            throw new IOException("Unable to find network interface.");
+        }
+
+        return ni;
+    }
+
+    boolean containsContext(SessionInfo sessionInfo) {
+        return sessions.containsKey(sessionInfo);
+    }
+
+    private UnidirectionalPcapChannelContext findContext(SessionInfo sessionInfo) {
+        UnidirectionalPcapChannelContext context = sessions.get(sessionInfo);
+        if (context == null) {
+            throw new IllegalArgumentException("Unable to find context for session: " + sessionInfo);
+        }
+
+        return context;
+    }
+
+    private int nextFreeLocalPort() throws IOException {
+        int port = portPool.borrow();
+        if (port == -1) {
+            throw new IOException("Bind error. Unable to get local port.");
+        }
+
+        return port;
+    }
+
+    void connect(SessionInfo sessionInfo, EmitterHandler handler) {
+        tasks.add(new ConnectTask(sessionInfo, handler));
+    }
+
+    void flush(SessionInfo sessionInfo) {
+        tasks.add(new FlushTask(sessionInfo));
+    }
+
+    void close(SessionInfo sessionInfo) {
+        tasks.add(new CloseTask(sessionInfo));
+    }
+
+    private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) {
+        ConnectionParams connParams = getConnectionParams(sessionInfo, handler, sessionMapper, collectMetrics, metric, LOGGER);
+        if (connParams == null) {
+            return;
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
+        }
+
+        UnidirectionalPcapChannelContext channelContext = null;
+
+        String device;
+        NetworkInterface networkInterface;
+        SocketAddress localAddress = connParams.getBindAddress();
+        MACAddress localMac;
+        try {
+            if (localAddress == null || ANY_SOCKET.equals(localAddress)) {
+                networkInterface = findFirstUpInterface();
+                InetAddress inetAddress = networkInterface.getInetAddresses().nextElement();
+                int port = nextFreeLocalPort();
+                localAddress = new SocketAddress(jdkInetAddresToIpAddress(inetAddress), port);
+            } else {
+                networkInterface = findInterface(localAddress.getIp());
+            }
+
+            localMac = new MACAddress(networkInterface.getHardwareAddress());
+            device = networkInterface.getName();
+        } catch (IOException ex) {
+            doCatchException(channelContext, handler, ex);
+            return;
+        }
+
+        SocketAddress remoteAddress = connParams.getRemoteAddress();
+        if (remoteAddress == null) {
+            remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
+        }
+
+        MACAddress remoteMac = macResolver.resolve(remoteAddress.getIp());
+        channelContext = new UnidirectionalPcapChannelContext(this, handler, sessionInfo, localAddress, remoteAddress, localMac, remoteMac);
+
+        sessions.put(sessionInfo, channelContext);
+
+        try {
+            handler.channelRegistered(channelContext);
+        } catch (Exception ex) {
+            doCatchException(channelContext, handler, ex);
+        }
+
+        try {
+            handler.channelActive(channelContext);
+        } catch (Exception ex) {
+            doCatchException(channelContext, handler, ex);
+        }
+    }
+
+    private void doClose(SessionInfo sessionInfo) {
+        UnidirectionalPcapChannelContext channelContext = sessions.get(sessionInfo);
+        if (channelContext == null) {
+            LOGGER.debug("Unable to find context for session " + sessionInfo + ".");
+            return;
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Closing session '" + sessionInfo + "'.");
+        }
+
+        EmitterHandler handler = channelContext.getHandler();
+        portPool.release(sessionInfo.getSrcPort());
+        try {
+            handler.channelInactive(channelContext);
+        } catch (Exception e) {
+            LOGGER.debug(e.getMessage(), e);
+        }
+
+        sessions.remove(sessionInfo);
+        try {
+            handler.channelUnregistered(channelContext);
+        } catch (Exception e) {
+            LOGGER.debug(e.getMessage(), e);
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'.");
+        }
+
+        if (collectMetrics) {
+            metric.incClosedConnections();
+        }
+    }
+
+    @Override
+    public void run() {
+        while (working) {
+            UnidirectionalTask task = null;
+            try {
+                task = tasks.take();
+            } catch (InterruptedException ignore) {
+
+            }
+
+            if (task != null) {
+                switch (task.code) {
+                    case Task.CONNECT:
+                        ConnectTask connectTask = (ConnectTask) task;
+                        doConnect(connectTask.sessionInfo, connectTask.handler);
+                        break;
+     /*               case Task.FLUSH:
+                        doWrite();
+                        break;*/
+                    case Task.CLOSE:
+                        CloseTask closeTask = (CloseTask) task;
+                        doClose(closeTask.sessionInfo);
+                        break;
+                }
+            }
+
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalTasks.java	Fri Oct 04 12:40:06 2019 +0200
@@ -0,0 +1,45 @@
+package com.passus.st.emitter.pcap;
+
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.emitter.Task;
+
+public class UnidirectionalTasks {
+
+    protected static class UnidirectionalTask extends Task {
+
+        final SessionInfo sessionInfo;
+
+        public UnidirectionalTask(int code, SessionInfo sessionInfo) {
+            super(code);
+            this.sessionInfo = sessionInfo;
+        }
+    }
+
+    protected final static class ConnectTask extends UnidirectionalTask {
+
+        final EmitterHandler handler;
+
+        public ConnectTask(SessionInfo sessionInfo, final EmitterHandler handler) {
+            super(CONNECT, sessionInfo);
+            this.handler = handler;
+        }
+
+    }
+
+    protected final static class CloseTask extends UnidirectionalTask {
+
+        public CloseTask(SessionInfo sessionInfo) {
+            super(CLOSE, sessionInfo);
+        }
+
+    }
+
+    protected final static class FlushTask extends UnidirectionalTask {
+
+        public FlushTask(SessionInfo sessionInfo) {
+            super(FLUSH, sessionInfo);
+        }
+
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitter.java	Fri Oct 04 12:40:06 2019 +0200
@@ -0,0 +1,133 @@
+package com.passus.st.emitter.pcap;
+
+import com.passus.commons.Assert;
+import com.passus.st.emitter.*;
+import com.passus.st.metric.MetricsContainer;
+
+import java.io.IOException;
+
+public class UnidirectionalUdpPcapEmitter implements Emitter {
+
+    private static final int DEFAULT_WORKERS_NUM = 4;
+
+    private boolean started = false;
+
+    private MACAddressResolver macAddressResolver;
+
+    private UnidirectionalPcapWorker[] workers;
+
+    private int workersNum = DEFAULT_WORKERS_NUM;
+
+    private PortPool portPool = new SynchronizedPortPoolWrapper(new PortPoolImpl());
+
+    private MACAddressResolver macResolver;
+
+    private SessionMapper sessionMapper = Emitter.DEFAULT_SESSION_MAPPER;
+
+    private boolean collectMetrics = false;
+
+    @Override
+    public void setSessionMapper(SessionMapper sessionMapper) {
+        this.sessionMapper = sessionMapper;
+    }
+
+    @Override
+    public SessionMapper getSessionMapper() {
+        return sessionMapper;
+    }
+
+    public int getWorkersNum() {
+        return workersNum;
+    }
+
+    public void setWorkersNum(int workersNum) {
+        Assert.greaterThanZero(workersNum, "workersNum");
+        this.workersNum = workersNum;
+    }
+
+    public MACAddressResolver getMacAddressResolver() {
+        return macAddressResolver;
+    }
+
+    public void setMacAddressResolver(MACAddressResolver macAddressResolver) {
+        Assert.notNull(macAddressResolver);
+        this.macAddressResolver = macAddressResolver;
+    }
+
+    public PortPool getPortPool() {
+        return portPool;
+    }
+
+    public void setPortPool(PortPool portPool) {
+        Assert.notNull(portPool, "portPool");
+        this.portPool = new SynchronizedPortPoolWrapper(portPool);
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        this.collectMetrics = collectMetrics;
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+        throw new RuntimeException("Not implemented yet.");
+    }
+
+    @Override
+    public boolean isStarted() {
+        return started;
+    }
+
+    @Override
+    public void start() {
+        if (started) {
+            return;
+        }
+
+        workers = new UnidirectionalPcapWorker[workersNum];
+        for (int i = 0; i < workersNum; i++) {
+            UnidirectionalPcapWorker worker = new UnidirectionalPcapWorker();
+            worker.setPortPool(portPool);
+            worker.setMacResolver(macResolver);
+            worker.start();
+            workers[i] = worker;
+        }
+
+        started = true;
+    }
+
+    @Override
+    public void stop() {
+        if (!started) {
+            return;
+        }
+
+        for (int i = 0; i < workersNum; i++) {
+            UnidirectionalPcapWorker worker = workers[i];
+            worker.working = false;
+            worker.interrupt();
+
+            try {
+                worker.join();
+            } catch (InterruptedException ignore) {
+
+            }
+        }
+
+        workers = null;
+        started = false;
+    }
+
+    @Override
+    public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException {
+        int hashCode = (sessionInfo.hashCode() + workerIndex) & 0x7fffffff;
+        int index = hashCode % workers.length;
+        workers[index].connect(sessionInfo, handler);
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java	Thu Oct 03 14:49:05 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java	Fri Oct 04 12:40:06 2019 +0200
@@ -9,8 +9,8 @@
 import java.util.LinkedList;
 import java.util.Queue;
 
-import static com.passus.st.emitter.socket.Connection.CLOSE_TASK;
-import static com.passus.st.emitter.socket.Connection.FLUSH_TASK;
+import static com.passus.st.emitter.StatelessTasks.CLOSE_TASK;
+import static com.passus.st.emitter.StatelessTasks.FLUSH_TASK;
 
 public abstract class AbstractChannelContext<K> implements ChannelContext<K> {
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Thu Oct 03 14:49:05 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Fri Oct 04 12:40:06 2019 +0200
@@ -3,10 +3,7 @@
 import com.passus.data.ByteBuff;
 import com.passus.data.HeapByteBuff;
 import com.passus.net.SocketAddress;
-import com.passus.st.emitter.EmitterHandler;
-import com.passus.st.emitter.EmitterMetric;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.emitter.SessionMapper;
+import com.passus.st.emitter.*;
 import com.passus.st.emitter.SessionMapper.ConnectionParams;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -16,8 +13,6 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID;
-
 public abstract class Connection extends Thread {
 
     protected final Logger logger = LogManager.getLogger(getClass());
@@ -59,50 +54,11 @@
     public abstract boolean isConnectionPending();
 
     protected final ConnectionParams getConnParams(SessionInfo sessionInfo, EmitterHandler handler) {
-        ConnectionParams connParams = sessionMapper.map(sessionInfo);
-        if (connParams == null) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Unable to map session '{}'.", sessionInfo);
-            }
-
-            if (collectMetrics) {
-                synchronized (metric) {
-                    metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID);
-                }
-            }
-
-            try {
-                handler.sessionInvalidated(sessionInfo);
-            } catch (Exception e) {
-                logger.debug(e.getMessage(), e);
-            }
-
-            return null;
-        }
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
-        }
-
-        return connParams;
+        return EmitterUtils.getConnectionParams(sessionInfo, handler, sessionMapper, collectMetrics, metric, logger);
     }
 
     protected void doCatchException(AbstractChannelContext channelContext, Throwable cause) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Error occurred. " + cause.getMessage(), cause);
-        }
-
-        if (collectMetrics) {
-            synchronized (metric) {
-                metric.errorCaught(cause);
-            }
-        }
-
-        try {
-            handler.errorOccurred(channelContext, cause);
-        } catch (Exception e) {
-            logger.debug(e.getMessage(), e);
-        }
+        EmitterUtils.doCatchException(channelContext, cause, handler, collectMetrics, metric, logger);
     }
 
     public abstract void connect();
@@ -214,24 +170,5 @@
         }
     }
 
-    protected static class Task {
-
-        public static final int CONNECT = 1;
-        public static final int READ = 2;
-        public static final int FLUSH = 3;
-        public static final int CLOSE = 4;
-
-        final int code;
-
-        public Task(int code) {
-            this.code = code;
-        }
-
-    }
-
-    static final Task CONNECT_TASK = new Task(Task.CONNECT);
-    static final Task READ_TASK = new Task(Task.READ);
-    static final Task FLUSH_TASK = new Task(Task.FLUSH);
-    static final Task CLOSE_TASK = new Task(Task.CLOSE);
 
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java	Thu Oct 03 14:49:05 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java	Fri Oct 04 12:40:06 2019 +0200
@@ -8,6 +8,7 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -63,7 +64,7 @@
         }
 
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
+            LOGGER.debug("Registering TCP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
         }
 
         try {
@@ -107,13 +108,19 @@
         connectionPending = false;
     }
 
-    @Override
-    public void close() {
+    private void closeQuietly(Closeable c) {
         try {
-            socket.close();
+            c.close();
         } catch (Exception ignore) {
 
         }
+    }
+
+    @Override
+    public void close() {
+        closeQuietly(socket);
+        closeQuietly(in);
+        closeQuietly(out);
 
         if (collectMetrics) {
             metric.incClosedConnections();
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Thu Oct 03 14:49:05 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Fri Oct 04 12:40:06 2019 +0200
@@ -89,7 +89,7 @@
             connections.forEach((sessionInfo, conn) -> {
                 if (conn.isConnected()) {
                     try {
-                        conn.tasks.put(Connection.CLOSE_TASK);
+                        conn.tasks.put(StatelessTasks.CLOSE_TASK);
                     } catch (InterruptedException e) {
 
                     }
@@ -155,7 +155,7 @@
                 throw new IllegalArgumentException("Not supported transport " + sessionInfo.getTransport() + ".");
             }
 
-            connection.tasks.add(Connection.CONNECT_TASK);
+            connection.tasks.add(StatelessTasks.CONNECT_TASK);
             connection.start();
             connections.put(sessionInfo, connection);
         }
--- a/stress-tester/src/test/java/com/passus/st/client/TestClientHandler.java	Thu Oct 03 14:49:05 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/TestClientHandler.java	Fri Oct 04 12:40:06 2019 +0200
@@ -1,15 +1,15 @@
 package com.passus.st.client;
 
-import com.passus.st.emitter.ChannelContext;
 import com.passus.data.ByteBuff;
 import com.passus.net.SocketAddress;
+import com.passus.st.emitter.ChannelContext;
 import com.passus.st.emitter.EmitterHandler;
 import com.passus.st.emitter.SessionInfo;
+
 import java.util.LinkedList;
 import java.util.Queue;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public class TestClientHandler implements EmitterHandler {
@@ -25,8 +25,12 @@
         USER
     }
 
+    public static final int DEFAULT_TIMEOUT = 5_000;
+
     private final LinkedList<ClientEvent> events = new LinkedList<>();
 
+    private ChannelContext channelContext;
+
     private boolean channelUnregistered = false;
 
     public boolean isChannelUnregistered() {
@@ -34,48 +38,99 @@
     }
 
     public void clear() {
-        channelUnregistered = false;
-        events.clear();
+        synchronized (this) {
+            channelUnregistered = false;
+            events.clear();
+            channelContext = null;
+        }
+    }
+
+    public ChannelContext getChannelContext() {
+        synchronized (this) {
+            return channelContext;
+        }
     }
 
     public Queue<ClientEvent> events() {
-        return new LinkedList<>(events);
+        synchronized (this) {
+            return new LinkedList<>(events);
+        }
+    }
+
+    public ClientEvent findFirst(EventType eventType) {
+        synchronized (this) {
+            for (ClientEvent event : events) {
+                if (event.getType() == eventType) {
+                    return event;
+                }
+            }
+
+            return null;
+        }
+    }
+
+    public boolean contains(EventType eventType) {
+        synchronized (this) {
+            for (ClientEvent event : events) {
+                if (event.getType() == eventType) {
+                    return true;
+                }
+            }
+
+            return false;
+        }
     }
 
     public ClientEvent get(int index) {
-        return events.get(index);
+        synchronized (this) {
+            return events.get(index);
+        }
     }
 
     public int size() {
-        return events.size();
+        synchronized (this) {
+            return events.size();
+        }
     }
 
     public boolean isEmpty() {
-        return events.isEmpty();
+        synchronized (this) {
+            return events.isEmpty();
+        }
     }
 
     public ClientEvent getFirst() {
-        return events.getFirst();
+        synchronized (this) {
+            return events.getFirst();
+        }
     }
 
     public ClientEvent getLast() {
-        return events.getLast();
+        synchronized (this) {
+            return events.getLast();
+        }
     }
 
     public void add(ClientEvent event) {
-        events.add(event);
+        synchronized (this) {
+            events.add(event);
+        }
     }
 
     public void add(EventType type, ChannelContext context) {
-        events.add(ClientEvent.create(type, context));
+        synchronized (this) {
+            events.add(ClientEvent.create(type, context));
+        }
     }
 
     @Override
     public final void errorOccurred(ChannelContext context, Throwable cause) throws Exception {
-        ClientEvent event = ClientEvent.create(EventType.ERROR_OCCURED, context);
-        event.setCause(cause);
-        add(event);
-        doErrorOccured(context, cause);
+        synchronized (this) {
+            ClientEvent event = ClientEvent.create(EventType.ERROR_OCCURED, context);
+            event.setCause(cause);
+            add(event);
+            doErrorOccured(context, cause);
+        }
     }
 
     protected void doErrorOccured(ChannelContext context, Throwable cause) throws Exception {
@@ -84,9 +139,11 @@
 
     @Override
     public final void dataWritten(ChannelContext context) throws Exception {
-        ClientEvent event = ClientEvent.create(EventType.DATA_WRITTEN, context);
-        add(event);
-        doDataWritten(context);
+        synchronized (this) {
+            ClientEvent event = ClientEvent.create(EventType.DATA_WRITTEN, context);
+            add(event);
+            doDataWritten(context);
+        }
     }
 
     protected void doDataWritten(ChannelContext context) throws Exception {
@@ -95,10 +152,12 @@
 
     @Override
     public final void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
-        ClientEvent event = ClientEvent.create(EventType.DATA_RECEIVED, context);
-        event.setData(data);
-        add(event);
-        doDataReceived(context, data);
+        synchronized (this) {
+            ClientEvent event = ClientEvent.create(EventType.DATA_RECEIVED, context);
+            event.setData(data);
+            add(event);
+            doDataReceived(context, data);
+        }
     }
 
     protected void doDataReceived(ChannelContext context, ByteBuff data) throws Exception {
@@ -107,9 +166,12 @@
 
     @Override
     public final void channelInactive(ChannelContext context) throws Exception {
-        ClientEvent event = ClientEvent.create(EventType.CHANNEL_INACTIVE, context);
-        add(event);
-        doChannelInactive(context);
+        synchronized (this) {
+            channelContext = context;
+            ClientEvent event = ClientEvent.create(EventType.CHANNEL_INACTIVE, context);
+            add(event);
+            doChannelInactive(context);
+        }
     }
 
     protected void doChannelInactive(ChannelContext context) throws Exception {
@@ -118,9 +180,11 @@
 
     @Override
     public final void channelActive(ChannelContext context) throws Exception {
-        ClientEvent event = ClientEvent.create(EventType.CHANNEL_ACTIVE, context);
-        add(event);
-        doChannelActive(context);
+        synchronized (this) {
+            ClientEvent event = ClientEvent.create(EventType.CHANNEL_ACTIVE, context);
+            add(event);
+            doChannelActive(context);
+        }
     }
 
     protected void doChannelActive(ChannelContext context) throws Exception {
@@ -129,10 +193,12 @@
 
     @Override
     public final void channelUnregistered(ChannelContext context) throws Exception {
-        channelUnregistered = true;
-        ClientEvent event = ClientEvent.create(EventType.CHANNEL_UNREGISTERED, context);
-        add(event);
-        doChannelUnregistered(context);
+        synchronized (this) {
+            channelUnregistered = true;
+            ClientEvent event = ClientEvent.create(EventType.CHANNEL_UNREGISTERED, context);
+            add(event);
+            doChannelUnregistered(context);
+        }
     }
 
     protected void doChannelUnregistered(ChannelContext context) throws Exception {
@@ -141,15 +207,39 @@
 
     @Override
     public void channelRegistered(ChannelContext context) throws Exception {
-        ClientEvent event = ClientEvent.create(EventType.CHANNEL_REGISTERED, context);
-        add(event);
-        doChannelRegistered(context);
+        synchronized (this) {
+            ClientEvent event = ClientEvent.create(EventType.CHANNEL_REGISTERED, context);
+            add(event);
+            doChannelRegistered(context);
+        }
     }
 
     protected void doChannelRegistered(ChannelContext context) throws Exception {
 
     }
 
+    public void waitForEvent(EventType type) {
+        waitForEvent(type, DEFAULT_TIMEOUT);
+    }
+
+    public void waitForEvent(EventType type, long timeout) {
+        synchronized (this) {
+            long start = System.currentTimeMillis();
+            while (!contains(type)) {
+                try {
+                    wait(5);
+                } catch (InterruptedException ignore) {
+
+                }
+
+                long now = System.currentTimeMillis();
+                if (now - start > timeout) {
+                    throw new RuntimeException("Wait timeout exceeded. Unable to find event '" + type + "'.");
+                }
+            }
+        }
+    }
+
     public static class ClientEvent {
 
         private final EventType type;
@@ -205,7 +295,12 @@
 
         @Override
         public String toString() {
-            return "ClientEvent{" + "type=" + type + ", sessionInfo=" + sessionInfo + ", localAddress=" + localAddress + ", remoteAddress=" + remoteAddress + ", data=" + data + ", cause=" + cause + '}';
+            return "ClientEvent{type=" + type + ", " +
+                    "sessionInfo=" + sessionInfo + ", " +
+                    "localAddress=" + localAddress + ", " +
+                    "remoteAddress=" + remoteAddress + ", " +
+                    "data=" + data + ", " +
+                    "cause=" + cause + '}';
         }
 
         private static ClientEvent create(EventType type, ChannelContext context) {
--- a/stress-tester/src/test/java/com/passus/st/emitter/RuleBasedSessionMapperTest.java	Thu Oct 03 14:49:05 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/emitter/RuleBasedSessionMapperTest.java	Fri Oct 04 12:40:06 2019 +0200
@@ -125,7 +125,9 @@
         RuleBasedSessionMapper mapper = new RuleBasedSessionMapper();
         mapper.addRule(rule);
         List<Integer> localPorts = Arrays.asList(101, 102, 103);
-        List<SocketAddress> bindAddresses = localPorts.stream().map((lp) -> mapper.map(si(lp)).getBindAddress()).collect(Collectors.toList());
+        List<SocketAddress> bindAddresses = localPorts.stream()
+                .map((lp) -> mapper.map(si(lp)).getBindAddress())
+                .collect(Collectors.toList());
         assertEquals(expected, bindAddresses);
     }
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorkerTest.java	Fri Oct 04 12:40:06 2019 +0200
@@ -0,0 +1,111 @@
+package com.passus.st.emitter.pcap;
+
+import com.passus.net.MACAddress;
+import com.passus.st.client.TestClientHandler;
+import com.passus.st.client.TestClientHandler.ClientEvent;
+import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.MapBasedMACAddressResolver;
+import com.passus.st.emitter.PortPool;
+import com.passus.st.emitter.SessionInfo;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.text.ParseException;
+import java.util.Iterator;
+
+import static com.passus.st.client.TestClientHandler.EventType.*;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+
+public class UnidirectionalPcapWorkerTest {
+
+
+    private final MapBasedMACAddressResolver macResolver = new MapBasedMACAddressResolver();
+
+    private final TestPortPool portPool = new TestPortPool();
+
+    @BeforeClass
+    public void beforeClass() throws ParseException {
+        macResolver.add("1.1.1.1 -> 11:11:11:11:11:11");
+        macResolver.add("2.2.2.2 -> 22:22:22:22:22:22");
+    }
+
+    private void startWorker(UnidirectionalPcapWorker worker) {
+        worker.start();
+        try {
+            Thread.sleep(200);
+        } catch (InterruptedException e) {
+
+        }
+    }
+
+    private void stopWorker(UnidirectionalPcapWorker worker) {
+        worker.working = false;
+        worker.interrupt();
+        try {
+            worker.join();
+        } catch (InterruptedException ignore) {
+
+        }
+    }
+
+    @Test
+    public void testConnect() throws Exception {
+        UnidirectionalPcapWorker worker = new UnidirectionalPcapWorker();
+        try {
+            worker.setMacResolver(macResolver);
+            worker.setPortPool(portPool);
+            worker.setSessionMapper(Emitter.DEFAULT_SESSION_MAPPER);
+            startWorker(worker);
+
+            TestClientHandler handler = new TestClientHandler();
+
+            SessionInfo sessionInfo = SessionInfo.parse("5.5.5.5:1000 -> 1.1.1.1:80");
+            worker.connect(sessionInfo, handler);
+            handler.waitForEvent(CHANNEL_REGISTERED);
+            assertTrue(portPool.borrowed);
+
+            worker.close(sessionInfo);
+            handler.waitForEvent(CHANNEL_UNREGISTERED);
+
+            UnidirectionalPcapChannelContext channelContext = (UnidirectionalPcapChannelContext) handler.getChannelContext();
+            assertEquals(TestPortPool.PORT, channelContext.getLocalAddress().getPort());
+            assertEquals(new MACAddress("11:11:11:11:11:11"), channelContext.getRemoteHardwareAddress());
+
+            Iterator<ClientEvent> it = handler.events().iterator();
+            assertEquals(CHANNEL_REGISTERED, it.next().getType());
+            assertEquals(CHANNEL_ACTIVE, it.next().getType());
+            assertEquals(CHANNEL_INACTIVE, it.next().getType());
+            assertEquals(CHANNEL_UNREGISTERED, it.next().getType());
+            assertFalse(portPool.borrowed);
+        } finally {
+            stopWorker(worker);
+        }
+    }
+
+
+    private class TestPortPool implements PortPool {
+
+        public static final int PORT = 100;
+
+        boolean borrowed = false;
+
+        @Override
+        public int borrow() {
+            if (borrowed) {
+                return -1;
+            }
+
+            borrowed = true;
+            return PORT;
+        }
+
+        @Override
+        public void release(int port) {
+            if (borrowed) {
+                borrowed = false;
+            }
+        }
+    }
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitterTest.java	Fri Oct 04 12:40:06 2019 +0200
@@ -0,0 +1,7 @@
+package com.passus.st.emitter.pcap;
+
+import static org.testng.Assert.*;
+
+public class UnidirectionalUdpPcapEmitterTest {
+
+}
\ No newline at end of file
--- a/stress-tester/src/test/java/com/passus/st/utils/NioServer.java	Thu Oct 03 14:49:05 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/utils/NioServer.java	Fri Oct 04 12:40:06 2019 +0200
@@ -46,6 +46,14 @@
         this.port = port;
     }
 
+    public ChannelHandler getServerHandler() {
+        return serverHandler;
+    }
+
+    public void setServerHandler(ChannelHandler serverHandler) {
+        this.serverHandler = serverHandler;
+    }
+
     public int getConnections() {
         return connectionsCounter.getConnections();
     }
@@ -99,10 +107,6 @@
             }
 
             channel = bs.bind(address, port);
-
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Server {}:{} started.", address, port);
-            }
             this.started = true;
         } catch (Exception ex) {
             LOGGER.error(ex.getMessage(), ex);
--- a/stress-tester/src/test/java/com/passus/st/utils/server/DatagramServerContext.java	Thu Oct 03 14:49:05 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/utils/server/DatagramServerContext.java	Fri Oct 04 12:40:06 2019 +0200
@@ -1,7 +1,5 @@
 package com.passus.st.utils.server;
 
-import com.passus.data.ByteBuff;
-
 import java.io.IOException;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
@@ -27,10 +25,6 @@
         return clientPort;
     }
 
-    public void write(ByteBuff buff) throws IOException {
-        write(buff.buffer(), buff.startIndex(), buff.length());
-    }
-
     public void write(byte[] data, int offset, int length) throws IOException {
         DatagramPacket packet = new DatagramPacket(data, offset, length, clientAddress, clientPort);
         serverSocket.send(packet);
--- a/stress-tester/src/test/java/com/passus/st/utils/server/ServerContext.java	Thu Oct 03 14:49:05 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/utils/server/ServerContext.java	Fri Oct 04 12:40:06 2019 +0200
@@ -6,7 +6,9 @@
 
 public interface ServerContext {
 
-    void write(ByteBuff buff) throws IOException;
+    default void write(ByteBuff buff) throws IOException {
+        write(buff.buffer(), buff.startIndex(), buff.length());
+    }
 
     void write(byte[] data, int offset, int length) throws IOException;