changeset 1066:bea08c5fe560

UnidirectionalRawPacketChannelContext, UnidirectionalRawPacketEmitter, UnidirectionalRawPacketWorker
author Devel 2
date Mon, 27 Apr 2020 12:36:42 +0200
parents 53e6b033a0d2
children 2f85c6a344c5
files stress-tester/src/main/java/com/passus/st/emitter/UnidirectionalTasks.java stress-tester/src/main/java/com/passus/st/emitter/pcap/PcapOutput.java stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalTasks.java stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitter.java stress-tester/src/main/java/com/passus/st/emitter/raw/PcapOutput.java stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketEmitter.java stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketWorker.java stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorkerTest.java stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitterTest.java stress-tester/src/test/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketEmitterTest.java stress-tester/src/test/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketWorkerTest.java stress-tester/src/test/java/com/passus/st/utils/server/NioServer.java
diffstat 17 files changed, 1263 insertions(+), 1016 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/UnidirectionalTasks.java	Mon Apr 27 12:36:42 2020 +0200
@@ -0,0 +1,41 @@
+package com.passus.st.emitter;
+
+public class UnidirectionalTasks {
+
+    public static class UnidirectionalTask extends Task {
+
+        public final SessionInfo sessionInfo;
+
+        public UnidirectionalTask(int code, SessionInfo sessionInfo) {
+            super(code);
+            this.sessionInfo = sessionInfo;
+        }
+    }
+
+    public final static class ConnectTask extends UnidirectionalTask {
+
+        public final EmitterHandler handler;
+
+        public ConnectTask(SessionInfo sessionInfo, final EmitterHandler handler) {
+            super(CONNECT, sessionInfo);
+            this.handler = handler;
+        }
+
+    }
+
+    public final static class CloseTask extends UnidirectionalTask {
+
+        public CloseTask(SessionInfo sessionInfo) {
+            super(CLOSE, sessionInfo);
+        }
+
+    }
+
+    public final static class FlushTask extends UnidirectionalTask {
+
+        public FlushTask(SessionInfo sessionInfo) {
+            super(FLUSH, sessionInfo);
+        }
+
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/PcapOutput.java	Fri Apr 24 12:32:45 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,78 +0,0 @@
-package com.passus.st.emitter.pcap;
-
-import com.passus.pcap.Pcap;
-import com.passus.pcap.PcapDumper;
-import com.passus.pcap.PcapLinkType;
-
-/**
- *
- * @author mikolaj.podbielski
- */
-public interface PcapOutput {
-
-    Pcap pcap();
-
-    int sendPacket(int length, byte[] payload);
-
-    void close();
-
-    public static class Sender implements PcapOutput {
-
-        private final Pcap pcap;
-
-        public Sender(Pcap pcap) {
-            this.pcap = pcap;
-        }
-
-        @Override
-        public Pcap pcap() {
-            return pcap;
-        }
-
-        @Override
-        public int sendPacket(int length, byte[] payload) {
-            return pcap.sendPacket(length, payload);
-        }
-
-        @Override
-        public void close() {
-            pcap.close();
-        }
-    }
-
-    public static class Writer implements PcapOutput {
-
-        private final Pcap pcap;
-        private final PcapDumper dumper;
-
-        public Writer(Pcap pcap, PcapDumper dumper) {
-            this.pcap = pcap;
-            this.dumper = dumper;
-        }
-
-        @Override
-        public Pcap pcap() {
-            return pcap;
-        }
-
-        @Override
-        public int sendPacket(int length, byte[] payload) {
-            dumper.dump(System.currentTimeMillis(), length, payload);
-            return 0;
-        }
-
-        @Override
-        public void close() {
-            dumper.close();
-            pcap.close();
-        }
-
-    }
-
-    public static Writer writer(String file) {
-        Pcap pcap = Pcap.openDead(PcapLinkType.DLT_EN10MB, 65536);
-        StringBuilder sb = new StringBuilder();
-        PcapDumper pd = PcapDumper.open(pcap, "", sb);
-        return new Writer(pcap, pd);
-    }
-}
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java	Fri Apr 24 12:32:45 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,157 +0,0 @@
-package com.passus.st.emitter.pcap;
-
-import com.passus.data.ByteBuff;
-import com.passus.net.MACAddress;
-import com.passus.net.SocketAddress;
-import com.passus.st.client.FlowContext;
-import com.passus.st.emitter.ChannelContext;
-import com.passus.st.emitter.EmitterHandler;
-import com.passus.st.emitter.SessionInfo;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.Queue;
-
-public class UnidirectionalPcapChannelContext implements ChannelContext {
-
-    private static final int DEFAULT_BUFFER_SIZE = 65 * 1024;
-
-    private final UnidirectionalPcapWorker worker;
-
-    private PcapOutput pcapOut;
-
-    final Queue<byte[]> dataQueue;
-
-    final byte[] buffer;
-
-    private FlowContext flowContext;
-
-    private final EmitterHandler handler;
-
-    private SessionInfo sessionInfo;
-
-    private final SocketAddress localAddress;
-
-    private final SocketAddress remoteAddress;
-
-    private final MACAddress localHardwareAddress;
-
-    private final MACAddress remoteHardwareAddress;
-
-    private String device;
-
-    public UnidirectionalPcapChannelContext(UnidirectionalPcapWorker worker,
-                                            EmitterHandler handler, SessionInfo sessionInfo,
-                                            SocketAddress localAddress, SocketAddress remoteAddress,
-                                            MACAddress localHardwareAddress, MACAddress remoteHardwareAddress) {
-        this.worker = worker;
-        this.handler = handler;
-        this.dataQueue = new LinkedList<>();
-        this.sessionInfo = sessionInfo;
-        this.localAddress = localAddress;
-        this.remoteAddress = remoteAddress;
-        this.localHardwareAddress = localHardwareAddress;
-        this.remoteHardwareAddress = remoteHardwareAddress;
-        this.buffer = new byte[DEFAULT_BUFFER_SIZE];
-    }
-
-    PcapOutput getPcapOut() {
-        return pcapOut;
-    }
-
-    void setPcapOut(PcapOutput pcapOut) {
-        this.pcapOut = pcapOut;
-    }
-
-    String getDevice() {
-        return device;
-    }
-
-    void setDevice(String device) {
-        this.device = device;
-    }
-
-    @Override
-    public SocketAddress getLocalAddress() {
-        return localAddress;
-    }
-
-    @Override
-    public SocketAddress getRemoteAddress() {
-        return remoteAddress;
-    }
-
-    @Override
-    public SessionInfo getSessionInfo() {
-        return sessionInfo;
-    }
-
-    public EmitterHandler getHandler() {
-        return handler;
-    }
-
-    public MACAddress getLocalHardwareAddress() {
-        return localHardwareAddress;
-    }
-
-    public MACAddress getRemoteHardwareAddress() {
-        return remoteHardwareAddress;
-    }
-
-    @Override
-    public void setFlowContext(FlowContext attachment) {
-        this.flowContext = attachment;
-    }
-
-    @Override
-    public FlowContext getFlowContext() {
-        return flowContext;
-    }
-
-    @Override
-    public boolean isBidirectional() {
-        return false;
-    }
-
-    @Override
-    public void setBidirectional(boolean bidirectional) {
-        if (bidirectional) {
-            throw new IllegalArgumentException("Bidirectional communication not supported.");
-        }
-    }
-
-    @Override
-    public boolean isConnected() {
-        return true;
-    }
-
-    @Override
-    public boolean isConnectionPending() {
-        return false;
-    }
-
-
-    @Override
-    public void write(byte[] data, int offset, int length) throws IOException {
-        byte[] out = new byte[length];
-        System.arraycopy(data, offset, out, 0, length);
-        dataQueue.add(out);
-    }
-
-    @Override
-    public void write(ByteBuff data) throws IOException {
-        dataQueue.add(data.toArray());
-    }
-
-    @Override
-    public void flush() throws IOException {
-        worker.flush(sessionInfo);
-    }
-
-    @Override
-    public void close() throws IOException {
-        worker.close(sessionInfo);
-    }
-
-
-}
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java	Fri Apr 24 12:32:45 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,401 +0,0 @@
-package com.passus.st.emitter.pcap;
-
-import com.passus.commons.Assert;
-import com.passus.net.IpAddress;
-import com.passus.net.MACAddress;
-import com.passus.net.SocketAddress;
-import com.passus.pcap.Pcap;
-import com.passus.pcap.PcapIfc;
-import com.passus.st.emitter.*;
-import com.passus.st.emitter.SessionMapper.ConnectionParams;
-import com.passus.st.emitter.pcap.UnidirectionalTasks.CloseTask;
-import com.passus.st.emitter.pcap.UnidirectionalTasks.ConnectTask;
-import com.passus.st.emitter.pcap.UnidirectionalTasks.FlushTask;
-import com.passus.st.emitter.pcap.UnidirectionalTasks.UnidirectionalTask;
-import com.passus.st.metric.MetricSource;
-import com.passus.st.metric.MetricsContainer;
-import org.apache.commons.lang3.SystemUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static com.passus.net.utils.AddressUtils.jdkInetAddresToIpAddress;
-import static com.passus.st.emitter.EmitterUtils.getConnectionParams;
-import static com.passus.st.emitter.SessionMapper.ANY_SOCKET;
-
-class UnidirectionalPcapWorker extends Thread implements MetricSource {
-
-    protected static final byte[] ZERO_MAC = {0, 0, 0, 0, 0, 0};
-
-    protected final Logger LOGGER = LogManager.getLogger(UnidirectionalPcapWorker.class);
-
-    boolean working = true;
-
-    protected final BlockingQueue<UnidirectionalTask> tasks = new LinkedBlockingQueue<>();
-
-    private Map<SessionInfo, UnidirectionalPcapChannelContext> sessions = new HashMap<>();
-
-    private SessionMapper sessionMapper;
-
-    private volatile boolean collectMetrics = false;
-
-    private EmitterMetric metric;
-
-    private PortPool portPool = new PortPoolImpl();
-
-    private MACAddressResolver macResolver;
-
-    private final Map<String, PcapInstance> pcaps = new HashMap<>();
-
-    public PortPool getPortPool() {
-        return portPool;
-    }
-
-    public void setPortPool(PortPool portPool) {
-        Assert.notNull(portPool, "portPool");
-        this.portPool = portPool;
-    }
-
-    public MACAddressResolver getMacResolver() {
-        return macResolver;
-    }
-
-    public void setMacResolver(MACAddressResolver macResolver) {
-        Assert.notNull(macResolver, "macResolver");
-        this.macResolver = macResolver;
-    }
-
-    public SessionMapper getSessionMapper() {
-        return sessionMapper;
-    }
-
-    public void setSessionMapper(SessionMapper sessionMapper) {
-        Assert.notNull(sessionMapper, "sessionMapper");
-        this.sessionMapper = sessionMapper;
-    }
-
-    @Override
-    public boolean isCollectMetrics() {
-        return collectMetrics;
-    }
-
-    @Override
-    public void setCollectMetrics(boolean collectMetrics) {
-        this.collectMetrics = collectMetrics;
-    }
-
-    @Override
-    public void writeMetrics(MetricsContainer container) {
-        if (collectMetrics) {
-            container.update(System.currentTimeMillis(), metric);
-            metric.reset();
-        }
-    }
-
-    private void doCatchException(UnidirectionalPcapChannelContext channelContext, Throwable cause) {
-        EmitterHandler handler = channelContext == null ? null : channelContext.getHandler();
-        EmitterUtils.doCatchException(channelContext, cause, handler, collectMetrics, metric, LOGGER);
-    }
-
-    private NetworkInterface findFirstUpInterface() throws IOException {
-        NetworkInterface ni = EmitterUtils.findFirstUpNetworkInterface();
-        if (ni == null) {
-            throw new IOException("Unable to find network interface.");
-        }
-
-        return ni;
-    }
-
-    private NetworkInterface findInterface(IpAddress address) throws IOException {
-        NetworkInterface ni = EmitterUtils.findNetworkInterface(address);
-        if (ni == null) {
-            throw new IOException("Unable to find network interface.");
-        }
-
-        return ni;
-    }
-
-    boolean containsContext(SessionInfo sessionInfo) {
-        return sessions.containsKey(sessionInfo);
-    }
-
-    private UnidirectionalPcapChannelContext findContext(SessionInfo sessionInfo) {
-        UnidirectionalPcapChannelContext context = sessions.get(sessionInfo);
-        if (context == null) {
-            throw new IllegalArgumentException("Unable to find context for session: " + sessionInfo);
-        }
-
-        return context;
-    }
-
-    private int nextFreeLocalPort() throws IOException {
-        int port = portPool.borrow();
-        if (port == -1) {
-            throw new IOException("Bind error. Unable to get local port.");
-        }
-
-        return port;
-    }
-
-    void connect(SessionInfo sessionInfo, EmitterHandler handler) {
-        tasks.add(new ConnectTask(sessionInfo, handler));
-    }
-
-    void flush(SessionInfo sessionInfo) {
-        tasks.add(new FlushTask(sessionInfo));
-    }
-
-    void close(SessionInfo sessionInfo) {
-        tasks.add(new CloseTask(sessionInfo));
-    }
-
-    private String getPcapDeviceName(NetworkInterface ifc) throws SocketException {
-        //Windows workaround
-        if (SystemUtils.IS_OS_WINDOWS) {
-            byte[] ifcMac = ifc.getHardwareAddress();
-            StringBuilder sb = new StringBuilder();
-            PcapIfc[] pcapIfcs = Pcap.listAvailableDevices(sb);
-            for (int i = 0; i < pcapIfcs.length; i++) {
-                PcapIfc pcapIfc = pcapIfcs[i];
-                byte[] pcapIfcMac = Pcap.getHardwareAddress(pcapIfc.name);
-                if (Arrays.equals(ifcMac, pcapIfcMac)) {
-                    return pcapIfc.name;
-                }
-            }
-        }
-
-        return ifc.getName();
-    }
-
-    private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) {
-        ConnectionParams connParams = getConnectionParams(sessionInfo, handler, sessionMapper, collectMetrics, metric, LOGGER);
-        if (connParams == null) {
-            return;
-        }
-
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
-        }
-
-        UnidirectionalPcapChannelContext channelContext = null;
-
-        String device;
-        NetworkInterface networkInterface;
-        SocketAddress localAddress = connParams.getBindAddress();
-        MACAddress localMac;
-        try {
-            if (localAddress == null || ANY_SOCKET.equals(localAddress)) {
-                networkInterface = findFirstUpInterface();
-                InetAddress inetAddress = networkInterface.getInetAddresses().nextElement();
-                int port = nextFreeLocalPort();
-                localAddress = new SocketAddress(jdkInetAddresToIpAddress(inetAddress), port);
-            } else {
-                networkInterface = findInterface(localAddress.getIp());
-            }
-
-            byte[] localHwAddress = networkInterface.getHardwareAddress();
-            if (localHwAddress == null) {
-                localHwAddress = ZERO_MAC;
-            }
-            localMac = new MACAddress(localHwAddress);
-            device = getPcapDeviceName(networkInterface);
-            LOGGER.debug("Found device " + device + " for ifc " + networkInterface);
-        } catch (IOException ex) {
-            doCatchException(channelContext, ex);
-            return;
-        }
-
-        SocketAddress remoteAddress = connParams.getRemoteAddress();
-        if (remoteAddress == null) {
-            remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
-        }
-
-        MACAddress remoteMac = macResolver.resolve(remoteAddress.getIp());
-        channelContext = new UnidirectionalPcapChannelContext(this, handler, sessionInfo, localAddress, remoteAddress, localMac, remoteMac);
-        sessions.put(sessionInfo, channelContext);
-
-        try {
-            handler.channelRegistered(channelContext);
-        } catch (Exception ex) {
-            doCatchException(channelContext, ex);
-        }
-
-        try {
-            PcapInstance pcapInstance = pcaps.get(device);
-            if (pcapInstance == null) {
-                StringBuilder sb = new StringBuilder();
-                Pcap pcap = Pcap.openLive(device, sb);
-                if (pcap == null) {
-                    doClose(sessionInfo);
-                    throw new IOException("Unable to open pcap on device '" + device + ".' " + sb);
-                }
-
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("Pcap instance for device {} created.", device);
-                }
-
-                pcapInstance = new PcapInstance(new PcapOutput.Sender(pcap));
-                pcaps.put(device, pcapInstance);
-            }
-
-            channelContext.setPcapOut(pcapInstance.pcap);
-            channelContext.setDevice(device);
-            pcapInstance.borrows++;
-
-            handler.channelActive(channelContext);
-        } catch (Exception ex) {
-            doCatchException(channelContext, ex);
-        }
-    }
-
-    private void doWrite(SessionInfo sessionInfo) {
-        UnidirectionalPcapChannelContext channelContext = sessions.get(sessionInfo);
-        if (channelContext == null) {
-            LOGGER.debug("Unable to find context for session {}.", sessionInfo);
-            return;
-        }
-
-        Queue<byte[]> queue = channelContext.dataQueue;
-        if (queue.isEmpty()) {
-            return;
-        }
-
-        EmitterHandler handler = channelContext.getHandler();
-        handler.dataWriteStart(channelContext);
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Writing ({} -> {}).", channelContext.getLocalAddress(), channelContext.getRemoteAddress());
-        }
-
-        int written = 0;
-        try {
-            PcapOutput pcap = channelContext.getPcapOut();
-
-            while (!queue.isEmpty()) {
-                byte[] buf = queue.poll();
-                int res = pcap.sendPacket(buf.length, buf);
-                if (res == 0) {
-                    written += buf.length;
-                } else {
-                    throw new IOException("Unable to send packet. Pcap error: " + pcap.pcap().getErr());
-                }
-            }
-        } catch (Exception e) {
-            doCatchException(channelContext, e);
-            doClose(channelContext);
-            return;
-        }
-
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Written {}B ({} -> {})", written, channelContext.getLocalAddress(), channelContext.getRemoteAddress());
-        }
-
-        try {
-            handler.dataWritten(channelContext);
-            LOGGER.debug("Write handled.");
-        } catch (Exception e) {
-            LOGGER.debug(e.getMessage(), e);
-        }
-    }
-
-    private void doClose(SessionInfo sessionInfo) {
-        UnidirectionalPcapChannelContext channelContext = sessions.get(sessionInfo);
-        if (channelContext == null) {
-            LOGGER.debug("Unable to find context for session {}.", sessionInfo);
-            return;
-        }
-
-        doClose(channelContext);
-    }
-
-    private void doClose(UnidirectionalPcapChannelContext channelContext) {
-        SessionInfo sessionInfo = channelContext.getSessionInfo();
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Closing session '" + sessionInfo + "'.");
-        }
-
-        EmitterHandler handler = channelContext.getHandler();
-        portPool.release(sessionInfo.getSrcPort());
-
-        String device = channelContext.getDevice();
-        PcapInstance pcapInstance = pcaps.get(device);
-        if (pcapInstance != null) {
-            pcapInstance.borrows--;
-            if (pcapInstance.borrows == 0) {
-                pcapInstance.pcap.close();
-                pcaps.remove(device);
-            }
-        }
-
-        try {
-            handler.channelInactive(channelContext);
-        } catch (Exception e) {
-            LOGGER.debug(e.getMessage(), e);
-        }
-
-        sessions.remove(sessionInfo);
-        try {
-            handler.channelUnregistered(channelContext);
-        } catch (Exception e) {
-            LOGGER.debug(e.getMessage(), e);
-        }
-
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'.");
-        }
-
-        if (collectMetrics) {
-            metric.incClosedConnections();
-        }
-    }
-
-    @Override
-    public void run() {
-        while (working) {
-            UnidirectionalTask task = null;
-            try {
-                task = tasks.take();
-            } catch (InterruptedException ignore) {
-
-            }
-
-            if (task != null) {
-                switch (task.code) {
-                    case Task.CONNECT:
-                        ConnectTask connectTask = (ConnectTask) task;
-                        doConnect(connectTask.sessionInfo, connectTask.handler);
-                        break;
-                    case Task.FLUSH:
-                        FlushTask flushTask = (FlushTask) task;
-                        doWrite(flushTask.sessionInfo);
-                        break;
-                    case Task.CLOSE:
-                        CloseTask closeTask = (CloseTask) task;
-                        doClose(closeTask.sessionInfo);
-                        break;
-                }
-            }
-
-        }
-    }
-
-    private static class PcapInstance {
-
-        private final PcapOutput pcap;
-
-        private int borrows;
-
-        public PcapInstance(PcapOutput pcap) {
-            this.pcap = pcap;
-        }
-    }
-}
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalTasks.java	Fri Apr 24 12:32:45 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,45 +0,0 @@
-package com.passus.st.emitter.pcap;
-
-import com.passus.st.emitter.EmitterHandler;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.emitter.Task;
-
-public class UnidirectionalTasks {
-
-    protected static class UnidirectionalTask extends Task {
-
-        final SessionInfo sessionInfo;
-
-        public UnidirectionalTask(int code, SessionInfo sessionInfo) {
-            super(code);
-            this.sessionInfo = sessionInfo;
-        }
-    }
-
-    protected final static class ConnectTask extends UnidirectionalTask {
-
-        final EmitterHandler handler;
-
-        public ConnectTask(SessionInfo sessionInfo, final EmitterHandler handler) {
-            super(CONNECT, sessionInfo);
-            this.handler = handler;
-        }
-
-    }
-
-    protected final static class CloseTask extends UnidirectionalTask {
-
-        public CloseTask(SessionInfo sessionInfo) {
-            super(CLOSE, sessionInfo);
-        }
-
-    }
-
-    protected final static class FlushTask extends UnidirectionalTask {
-
-        public FlushTask(SessionInfo sessionInfo) {
-            super(FLUSH, sessionInfo);
-        }
-
-    }
-}
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitter.java	Fri Apr 24 12:32:45 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,136 +0,0 @@
-package com.passus.st.emitter.pcap;
-
-import com.passus.commons.Assert;
-import com.passus.net.session.Session;
-import com.passus.st.emitter.*;
-import com.passus.st.metric.MetricsContainer;
-
-import java.io.IOException;
-
-public class UnidirectionalUdpPcapEmitter implements Emitter {
-
-    private static final int DEFAULT_WORKERS_NUM = 4;
-
-    private boolean started = false;
-
-    private UnidirectionalPcapWorker[] workers;
-
-    private int workersNum = DEFAULT_WORKERS_NUM;
-
-    private PortPool portPool = new SynchronizedPortPoolWrapper(new PortPoolImpl());
-
-    private MACAddressResolver macResolver;
-
-    private SessionMapper sessionMapper = Emitter.DEFAULT_SESSION_MAPPER;
-
-    private boolean collectMetrics = false;
-
-    @Override
-    public void setSessionMapper(SessionMapper sessionMapper) {
-        this.sessionMapper = sessionMapper;
-    }
-
-    @Override
-    public SessionMapper getSessionMapper() {
-        return sessionMapper;
-    }
-
-    public int getWorkersNum() {
-        return workersNum;
-    }
-
-    public void setWorkersNum(int workersNum) {
-        Assert.greaterThanZero(workersNum, "workersNum");
-        this.workersNum = workersNum;
-    }
-
-    public MACAddressResolver getMacResolver() {
-        return macResolver;
-    }
-
-    public void setMacResolver(MACAddressResolver macResolver) {
-        this.macResolver = macResolver;
-    }
-
-    public PortPool getPortPool() {
-        return portPool;
-    }
-
-    public void setPortPool(PortPool portPool) {
-        Assert.notNull(portPool, "portPool");
-        this.portPool = new SynchronizedPortPoolWrapper(portPool);
-    }
-
-    @Override
-    public boolean isCollectMetrics() {
-        return collectMetrics;
-    }
-
-    @Override
-    public void setCollectMetrics(boolean collectMetrics) {
-        this.collectMetrics = collectMetrics;
-    }
-
-    @Override
-    public void writeMetrics(MetricsContainer container) {
-        throw new RuntimeException("Not implemented yet.");
-    }
-
-    @Override
-    public boolean isStarted() {
-        return started;
-    }
-
-    @Override
-    public void start() {
-        if (started) {
-            return;
-        }
-
-        workers = new UnidirectionalPcapWorker[workersNum];
-        for (int i = 0; i < workersNum; i++) {
-            UnidirectionalPcapWorker worker = new UnidirectionalPcapWorker();
-            worker.setPortPool(portPool);
-            worker.setMacResolver(macResolver);
-            worker.setSessionMapper(sessionMapper);
-            worker.start();
-            workers[i] = worker;
-        }
-
-        started = true;
-    }
-
-    @Override
-    public void stop() {
-        if (!started) {
-            return;
-        }
-
-        for (int i = 0; i < workersNum; i++) {
-            UnidirectionalPcapWorker worker = workers[i];
-            worker.working = false;
-            worker.interrupt();
-
-            try {
-                worker.join();
-            } catch (InterruptedException ignore) {
-
-            }
-        }
-
-        workers = null;
-        started = false;
-    }
-
-    @Override
-    public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException {
-        if (sessionInfo.getTransport() != Session.PROTOCOL_UDP) {
-            throw new IllegalArgumentException("Only UDP transport supported.");
-        }
-
-        int hashCode = (sessionInfo.hashCode() + workerIndex) & 0x7fffffff;
-        int index = hashCode % workers.length;
-        workers[index].connect(sessionInfo, handler);
-    }
-
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/PcapOutput.java	Mon Apr 27 12:36:42 2020 +0200
@@ -0,0 +1,78 @@
+package com.passus.st.emitter.raw;
+
+import com.passus.pcap.Pcap;
+import com.passus.pcap.PcapDumper;
+import com.passus.pcap.PcapLinkType;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public interface PcapOutput {
+
+    Pcap pcap();
+
+    int sendPacket(int length, byte[] payload);
+
+    void close();
+
+    public static class Sender implements PcapOutput {
+
+        private final Pcap pcap;
+
+        public Sender(Pcap pcap) {
+            this.pcap = pcap;
+        }
+
+        @Override
+        public Pcap pcap() {
+            return pcap;
+        }
+
+        @Override
+        public int sendPacket(int length, byte[] payload) {
+            return pcap.sendPacket(length, payload);
+        }
+
+        @Override
+        public void close() {
+            pcap.close();
+        }
+    }
+
+    public static class Writer implements PcapOutput {
+
+        private final Pcap pcap;
+        private final PcapDumper dumper;
+
+        public Writer(Pcap pcap, PcapDumper dumper) {
+            this.pcap = pcap;
+            this.dumper = dumper;
+        }
+
+        @Override
+        public Pcap pcap() {
+            return pcap;
+        }
+
+        @Override
+        public int sendPacket(int length, byte[] payload) {
+            dumper.dump(System.currentTimeMillis(), length, payload);
+            return 0;
+        }
+
+        @Override
+        public void close() {
+            dumper.close();
+            pcap.close();
+        }
+
+    }
+
+    public static Writer writer(String file) {
+        Pcap pcap = Pcap.openDead(PcapLinkType.DLT_EN10MB, 65536);
+        StringBuilder sb = new StringBuilder();
+        PcapDumper pd = PcapDumper.open(pcap, "", sb);
+        return new Writer(pcap, pd);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketEmitter.java	Mon Apr 27 12:36:42 2020 +0200
@@ -0,0 +1,14 @@
+package com.passus.st.emitter.raw;
+
+public class PcapUnidirectionalRawPacketEmitter extends UnidirectionalRawPacketEmitter<PcapOutput> {
+
+    @Override
+    protected UnidirectionalRawPacketWorker<PcapOutput>[] createWorkersArray(int workersNum) {
+        return new PcapUnidirectionalRawPacketWorker[workersNum];
+    }
+
+    @Override
+    protected UnidirectionalRawPacketWorker<PcapOutput> createWorker() {
+        return new PcapUnidirectionalRawPacketWorker();
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketWorker.java	Mon Apr 27 12:36:42 2020 +0200
@@ -0,0 +1,101 @@
+package com.passus.st.emitter.raw;
+
+import com.passus.pcap.Pcap;
+import com.passus.pcap.PcapIfc;
+import org.apache.commons.lang3.SystemUtils;
+
+import java.io.IOException;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+
+class PcapUnidirectionalRawPacketWorker extends UnidirectionalRawPacketWorker<PcapOutput> {
+
+    private final Map<String, PcapInstance> pcaps = new HashMap<>();
+
+    protected String resolveDevice(NetworkInterface ifc) throws SocketException {
+        //Windows workaround
+        if (SystemUtils.IS_OS_WINDOWS) {
+            byte[] ifcMac = ifc.getHardwareAddress();
+            StringBuilder sb = new StringBuilder();
+            PcapIfc[] pcapIfcs = Pcap.listAvailableDevices(sb);
+            for (int i = 0; i < pcapIfcs.length; i++) {
+                PcapIfc pcapIfc = pcapIfcs[i];
+                byte[] pcapIfcMac = Pcap.getHardwareAddress(pcapIfc.name);
+                if (Arrays.equals(ifcMac, pcapIfcMac)) {
+                    return pcapIfc.name;
+                }
+            }
+        }
+
+        return ifc.getName();
+    }
+
+    @Override
+    protected PcapOutput doInitEngine(UnidirectionalRawPacketChannelContext<PcapOutput> channelContext, String device) throws IOException {
+        PcapInstance pcapInstance = pcaps.get(device);
+        if (pcapInstance == null) {
+            StringBuilder sb = new StringBuilder();
+            Pcap pcap = Pcap.openLive(device, sb);
+            if (pcap == null) {
+                doClose(channelContext.getSessionInfo());
+                throw new IOException("Unable to open pcap on device '" + device + ".' " + sb);
+            }
+
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Pcap instance for device {} created.", device);
+            }
+
+            pcapInstance = new PcapInstance(new PcapOutput.Sender(pcap));
+            pcaps.put(device, pcapInstance);
+        }
+
+        return pcapInstance.pcap;
+    }
+
+    @Override
+    protected int doWrite0(UnidirectionalRawPacketChannelContext<PcapOutput> channelContext) throws IOException {
+        PcapOutput pcap = channelContext.getEngine();
+        Queue<byte[]> queue = channelContext.getDataQueue();
+        int written = 0;
+        while (!queue.isEmpty()) {
+            byte[] buf = queue.poll();
+            int res = pcap.sendPacket(buf.length, buf);
+            if (res == 0) {
+                written += buf.length;
+            } else {
+                throw new IOException("Unable to send packet. Pcap error: " + pcap.pcap().getErr());
+            }
+        }
+
+        return written;
+    }
+
+    @Override
+    protected void doClose0(UnidirectionalRawPacketChannelContext<PcapOutput> channelContext) {
+        String device = channelContext.getDevice();
+        PcapInstance pcapInstance = pcaps.get(device);
+        if (pcapInstance != null) {
+            pcapInstance.borrows--;
+            if (pcapInstance.borrows == 0) {
+                pcapInstance.pcap.close();
+                pcaps.remove(device);
+            }
+        }
+    }
+
+
+    private static class PcapInstance {
+
+        private final PcapOutput pcap;
+
+        private int borrows;
+
+        public PcapInstance(PcapOutput pcap) {
+            this.pcap = pcap;
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketChannelContext.java	Mon Apr 27 12:36:42 2020 +0200
@@ -0,0 +1,160 @@
+package com.passus.st.emitter.raw;
+
+import com.passus.data.ByteBuff;
+import com.passus.net.MACAddress;
+import com.passus.net.SocketAddress;
+import com.passus.st.client.FlowContext;
+import com.passus.st.emitter.ChannelContext;
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.SessionInfo;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+
+public class UnidirectionalRawPacketChannelContext<E> implements ChannelContext {
+
+    private static final int DEFAULT_BUFFER_SIZE = 65 * 1024;
+
+    private final UnidirectionalRawPacketWorker<E> worker;
+
+    private E engine;
+
+    final Queue<byte[]> dataQueue;
+
+    final byte[] buffer;
+
+    private FlowContext flowContext;
+
+    private final EmitterHandler handler;
+
+    private SessionInfo sessionInfo;
+
+    private final SocketAddress localAddress;
+
+    private final SocketAddress remoteAddress;
+
+    private final MACAddress localHardwareAddress;
+
+    private final MACAddress remoteHardwareAddress;
+
+    private String device;
+
+    public UnidirectionalRawPacketChannelContext(UnidirectionalRawPacketWorker<E> worker,
+                                                 EmitterHandler handler, SessionInfo sessionInfo,
+                                                 SocketAddress localAddress, SocketAddress remoteAddress,
+                                                 MACAddress localHardwareAddress, MACAddress remoteHardwareAddress) {
+        this.worker = worker;
+        this.handler = handler;
+        this.dataQueue = new LinkedList<>();
+        this.sessionInfo = sessionInfo;
+        this.localAddress = localAddress;
+        this.remoteAddress = remoteAddress;
+        this.localHardwareAddress = localHardwareAddress;
+        this.remoteHardwareAddress = remoteHardwareAddress;
+        this.buffer = new byte[DEFAULT_BUFFER_SIZE];
+    }
+
+    public E getEngine() {
+        return engine;
+    }
+
+    void setEngine(E engine) {
+        this.engine = engine;
+    }
+
+    public String getDevice() {
+        return device;
+    }
+
+    void setDevice(String device) {
+        this.device = device;
+    }
+
+    public Queue<byte[]> getDataQueue() {
+        return dataQueue;
+    }
+
+    @Override
+    public SocketAddress getLocalAddress() {
+        return localAddress;
+    }
+
+    @Override
+    public SocketAddress getRemoteAddress() {
+        return remoteAddress;
+    }
+
+    @Override
+    public SessionInfo getSessionInfo() {
+        return sessionInfo;
+    }
+
+    public EmitterHandler getHandler() {
+        return handler;
+    }
+
+    public MACAddress getLocalHardwareAddress() {
+        return localHardwareAddress;
+    }
+
+    public MACAddress getRemoteHardwareAddress() {
+        return remoteHardwareAddress;
+    }
+
+    @Override
+    public void setFlowContext(FlowContext attachment) {
+        this.flowContext = attachment;
+    }
+
+    @Override
+    public FlowContext getFlowContext() {
+        return flowContext;
+    }
+
+    @Override
+    public boolean isBidirectional() {
+        return false;
+    }
+
+    @Override
+    public void setBidirectional(boolean bidirectional) {
+        if (bidirectional) {
+            throw new IllegalArgumentException("Bidirectional communication not supported.");
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        return true;
+    }
+
+    @Override
+    public boolean isConnectionPending() {
+        return false;
+    }
+
+    @Override
+    public void write(byte[] data, int offset, int length) throws IOException {
+        byte[] out = new byte[length];
+        System.arraycopy(data, offset, out, 0, length);
+        dataQueue.add(out);
+    }
+
+    @Override
+    public void write(ByteBuff data) throws IOException {
+        dataQueue.add(data.toArray());
+    }
+
+    @Override
+    public void flush() throws IOException {
+        worker.flush(sessionInfo);
+    }
+
+    @Override
+    public void close() throws IOException {
+        worker.close(sessionInfo);
+    }
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java	Mon Apr 27 12:36:42 2020 +0200
@@ -0,0 +1,140 @@
+package com.passus.st.emitter.raw;
+
+import com.passus.commons.Assert;
+import com.passus.net.session.Session;
+import com.passus.st.emitter.*;
+import com.passus.st.metric.MetricsContainer;
+
+import java.io.IOException;
+
+public abstract class UnidirectionalRawPacketEmitter<E> implements Emitter {
+
+    private static final int DEFAULT_WORKERS_NUM = 4;
+
+    private boolean started = false;
+
+    private UnidirectionalRawPacketWorker<E>[] workers;
+
+    private int workersNum = DEFAULT_WORKERS_NUM;
+
+    private PortPool portPool = new SynchronizedPortPoolWrapper(new PortPoolImpl());
+
+    private MACAddressResolver macResolver;
+
+    private SessionMapper sessionMapper = Emitter.DEFAULT_SESSION_MAPPER;
+
+    private boolean collectMetrics = false;
+
+    @Override
+    public void setSessionMapper(SessionMapper sessionMapper) {
+        this.sessionMapper = sessionMapper;
+    }
+
+    @Override
+    public SessionMapper getSessionMapper() {
+        return sessionMapper;
+    }
+
+    public int getWorkersNum() {
+        return workersNum;
+    }
+
+    public void setWorkersNum(int workersNum) {
+        Assert.greaterThanZero(workersNum, "workersNum");
+        this.workersNum = workersNum;
+    }
+
+    public MACAddressResolver getMacResolver() {
+        return macResolver;
+    }
+
+    public void setMacResolver(MACAddressResolver macResolver) {
+        this.macResolver = macResolver;
+    }
+
+    public PortPool getPortPool() {
+        return portPool;
+    }
+
+    public void setPortPool(PortPool portPool) {
+        Assert.notNull(portPool, "portPool");
+        this.portPool = new SynchronizedPortPoolWrapper(portPool);
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        this.collectMetrics = collectMetrics;
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+        throw new RuntimeException("Not implemented yet.");
+    }
+
+    @Override
+    public boolean isStarted() {
+        return started;
+    }
+
+    protected abstract UnidirectionalRawPacketWorker<E>[] createWorkersArray(int workersNum);
+
+    protected abstract UnidirectionalRawPacketWorker<E> createWorker();
+
+    @Override
+    public void start() {
+        if (started) {
+            return;
+        }
+
+        workers = createWorkersArray(workersNum);
+        for (int i = 0; i < workersNum; i++) {
+            UnidirectionalRawPacketWorker<E> worker = createWorker();
+            worker.setPortPool(portPool);
+            worker.setMacResolver(macResolver);
+            worker.setSessionMapper(sessionMapper);
+            worker.start();
+            workers[i] = worker;
+        }
+
+        started = true;
+    }
+
+    @Override
+    public void stop() {
+        if (!started) {
+            return;
+        }
+
+        for (int i = 0; i < workersNum; i++) {
+            UnidirectionalRawPacketWorker<E> worker = workers[i];
+            worker.working = false;
+            worker.interrupt();
+
+            try {
+                worker.join();
+            } catch (InterruptedException ignore) {
+
+            }
+        }
+
+        workers = null;
+        started = false;
+    }
+
+    @Override
+    public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException {
+        if (sessionInfo.getTransport() != Session.PROTOCOL_UDP) {
+            throw new IllegalArgumentException("Only UDP transport supported.");
+        }
+
+        int hashCode = (sessionInfo.hashCode() + workerIndex) & 0x7fffffff;
+        int index = hashCode % workers.length;
+        workers[index].connect(sessionInfo, handler);
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java	Mon Apr 27 12:36:42 2020 +0200
@@ -0,0 +1,342 @@
+package com.passus.st.emitter.raw;
+
+import com.passus.commons.Assert;
+import com.passus.net.IpAddress;
+import com.passus.net.MACAddress;
+import com.passus.net.SocketAddress;
+import com.passus.st.emitter.*;
+import com.passus.st.metric.MetricSource;
+import com.passus.st.metric.MetricsContainer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static com.passus.net.utils.AddressUtils.jdkInetAddresToIpAddress;
+import static com.passus.st.emitter.EmitterUtils.getConnectionParams;
+import static com.passus.st.emitter.SessionMapper.ANY_SOCKET;
+
+public abstract class UnidirectionalRawPacketWorker<E> extends Thread implements MetricSource {
+
+    protected static final byte[] ZERO_MAC = {0, 0, 0, 0, 0, 0};
+
+    protected final Logger LOGGER = LogManager.getLogger(UnidirectionalRawPacketWorker.class);
+
+    boolean working = true;
+
+    protected final BlockingQueue<UnidirectionalTasks.UnidirectionalTask> tasks = new LinkedBlockingQueue<>();
+
+    private Map<SessionInfo, UnidirectionalRawPacketChannelContext<E>> sessions = new HashMap<>();
+
+    private SessionMapper sessionMapper;
+
+    private volatile boolean collectMetrics = false;
+
+    private EmitterMetric metric;
+
+    private PortPool portPool = new PortPoolImpl();
+
+    private MACAddressResolver macResolver;
+
+    public PortPool getPortPool() {
+        return portPool;
+    }
+
+    public void setPortPool(PortPool portPool) {
+        Assert.notNull(portPool, "portPool");
+        this.portPool = portPool;
+    }
+
+    public MACAddressResolver getMacResolver() {
+        return macResolver;
+    }
+
+    public void setMacResolver(MACAddressResolver macResolver) {
+        Assert.notNull(macResolver, "macResolver");
+        this.macResolver = macResolver;
+    }
+
+    public SessionMapper getSessionMapper() {
+        return sessionMapper;
+    }
+
+    public void setSessionMapper(SessionMapper sessionMapper) {
+        Assert.notNull(sessionMapper, "sessionMapper");
+        this.sessionMapper = sessionMapper;
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        this.collectMetrics = collectMetrics;
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+        if (collectMetrics) {
+            container.update(System.currentTimeMillis(), metric);
+            metric.reset();
+        }
+    }
+
+    private void doCatchException(UnidirectionalRawPacketChannelContext<E> channelContext, Throwable cause) {
+        EmitterHandler handler = channelContext == null ? null : channelContext.getHandler();
+        EmitterUtils.doCatchException(channelContext, cause, handler, collectMetrics, metric, LOGGER);
+    }
+
+    private NetworkInterface findFirstUpInterface() throws IOException {
+        NetworkInterface ni = EmitterUtils.findFirstUpNetworkInterface();
+        if (ni == null) {
+            throw new IOException("Unable to find network interface.");
+        }
+
+        return ni;
+    }
+
+    private NetworkInterface findInterface(IpAddress address) throws IOException {
+        NetworkInterface ni = EmitterUtils.findNetworkInterface(address);
+        if (ni == null) {
+            throw new IOException("Unable to find network interface.");
+        }
+
+        return ni;
+    }
+
+    boolean containsContext(SessionInfo sessionInfo) {
+        return sessions.containsKey(sessionInfo);
+    }
+
+    private UnidirectionalRawPacketChannelContext<E> findContext(SessionInfo sessionInfo) {
+        UnidirectionalRawPacketChannelContext<E> context = sessions.get(sessionInfo);
+        if (context == null) {
+            throw new IllegalArgumentException("Unable to find context for session: " + sessionInfo);
+        }
+
+        return context;
+    }
+
+    private int nextFreeLocalPort() throws IOException {
+        int port = portPool.borrow();
+        if (port == -1) {
+            throw new IOException("Bind error. Unable to get local port.");
+        }
+
+        return port;
+    }
+
+    void connect(SessionInfo sessionInfo, EmitterHandler handler) {
+        tasks.add(new UnidirectionalTasks.ConnectTask(sessionInfo, handler));
+    }
+
+    void flush(SessionInfo sessionInfo) {
+        tasks.add(new UnidirectionalTasks.FlushTask(sessionInfo));
+    }
+
+    void close(SessionInfo sessionInfo) {
+        tasks.add(new UnidirectionalTasks.CloseTask(sessionInfo));
+    }
+
+    protected abstract String resolveDevice(NetworkInterface networkInterface) throws IOException;
+
+    protected abstract E doInitEngine(UnidirectionalRawPacketChannelContext<E> channelContext, String device) throws IOException;
+
+    private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) {
+        SessionMapper.ConnectionParams connParams = getConnectionParams(sessionInfo, handler, sessionMapper, collectMetrics, metric, LOGGER);
+        if (connParams == null) {
+            return;
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
+        }
+
+        UnidirectionalRawPacketChannelContext<E> channelContext = null;
+
+        String device;
+        NetworkInterface networkInterface;
+        SocketAddress localAddress = connParams.getBindAddress();
+        MACAddress localMac;
+        try {
+            if (localAddress == null || ANY_SOCKET.equals(localAddress)) {
+                networkInterface = findFirstUpInterface();
+                InetAddress inetAddress = networkInterface.getInetAddresses().nextElement();
+                int port = nextFreeLocalPort();
+                localAddress = new SocketAddress(jdkInetAddresToIpAddress(inetAddress), port);
+            } else {
+                networkInterface = findInterface(localAddress.getIp());
+            }
+
+            byte[] localHwAddress = networkInterface.getHardwareAddress();
+            if (localHwAddress == null) {
+                localHwAddress = ZERO_MAC;
+            }
+            localMac = new MACAddress(localHwAddress);
+            device = resolveDevice(networkInterface);
+            LOGGER.debug("Found device " + device + " for ifc " + networkInterface);
+        } catch (IOException ex) {
+            doCatchException(channelContext, ex);
+            return;
+        }
+
+        SocketAddress remoteAddress = connParams.getRemoteAddress();
+        if (remoteAddress == null) {
+            remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
+        }
+
+        MACAddress remoteMac = macResolver.resolve(remoteAddress.getIp());
+        channelContext = new UnidirectionalRawPacketChannelContext(this, handler, sessionInfo, localAddress, remoteAddress, localMac, remoteMac);
+        sessions.put(sessionInfo, channelContext);
+
+        try {
+            handler.channelRegistered(channelContext);
+        } catch (Exception ex) {
+            doCatchException(channelContext, ex);
+        }
+
+        try {
+            E engine = doInitEngine(channelContext, device);
+            channelContext.setEngine(engine);
+            channelContext.setDevice(device);
+            handler.channelActive(channelContext);
+        } catch (Exception ex) {
+            doCatchException(channelContext, ex);
+        }
+    }
+
+    protected abstract int doWrite0(UnidirectionalRawPacketChannelContext<E> channelContext) throws IOException;
+
+    private void doWrite(SessionInfo sessionInfo) {
+        UnidirectionalRawPacketChannelContext<E> channelContext = sessions.get(sessionInfo);
+        if (channelContext == null) {
+            LOGGER.debug("Unable to find context for session {}.", sessionInfo);
+            return;
+        }
+
+        Queue<byte[]> queue = channelContext.dataQueue;
+        if (queue.isEmpty()) {
+            return;
+        }
+
+        EmitterHandler handler = channelContext.getHandler();
+        handler.dataWriteStart(channelContext);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Writing ({} -> {}).", channelContext.getLocalAddress(), channelContext.getRemoteAddress());
+        }
+
+        int written = 0;
+        try {
+            written = doWrite0(channelContext);
+        } catch (Exception e) {
+            doCatchException(channelContext, e);
+            doClose(channelContext);
+            return;
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Written {}B ({} -> {})", written, channelContext.getLocalAddress(), channelContext.getRemoteAddress());
+        }
+
+        try {
+            handler.dataWritten(channelContext);
+            LOGGER.debug("Write handled.");
+        } catch (Exception e) {
+            LOGGER.debug(e.getMessage(), e);
+        }
+    }
+
+    protected void doClose(SessionInfo sessionInfo) {
+        UnidirectionalRawPacketChannelContext<E> channelContext = sessions.get(sessionInfo);
+        if (channelContext == null) {
+            LOGGER.debug("Unable to find context for session {}.", sessionInfo);
+            return;
+        }
+
+        doClose(channelContext);
+    }
+
+    protected abstract void doClose0(UnidirectionalRawPacketChannelContext<E> channelContext);
+
+    private void doClose(UnidirectionalRawPacketChannelContext<E> channelContext) {
+        SessionInfo sessionInfo = channelContext.getSessionInfo();
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Closing session '" + sessionInfo + "'.");
+        }
+
+        EmitterHandler handler = channelContext.getHandler();
+        portPool.release(sessionInfo.getSrcPort());
+
+        doClose0(channelContext);
+        try {
+            handler.channelInactive(channelContext);
+        } catch (Exception e) {
+            LOGGER.debug(e.getMessage(), e);
+        }
+
+        sessions.remove(sessionInfo);
+        try {
+            handler.channelUnregistered(channelContext);
+        } catch (Exception e) {
+            LOGGER.debug(e.getMessage(), e);
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'.");
+        }
+
+        if (collectMetrics) {
+            metric.incClosedConnections();
+        }
+    }
+
+    @Override
+    public void run() {
+        while (working) {
+            UnidirectionalTasks.UnidirectionalTask task = null;
+            try {
+                task = tasks.take();
+            } catch (InterruptedException ignore) {
+
+            }
+
+            if (task != null) {
+                switch (task.code) {
+                    case Task.CONNECT:
+                        UnidirectionalTasks.ConnectTask connectTask = (UnidirectionalTasks.ConnectTask) task;
+                        doConnect(connectTask.sessionInfo, connectTask.handler);
+                        break;
+                    case Task.FLUSH:
+                        UnidirectionalTasks.FlushTask flushTask = (UnidirectionalTasks.FlushTask) task;
+                        doWrite(flushTask.sessionInfo);
+                        break;
+                    case Task.CLOSE:
+                        UnidirectionalTasks.CloseTask closeTask = (UnidirectionalTasks.CloseTask) task;
+                        doClose(closeTask.sessionInfo);
+                        break;
+                }
+            }
+
+        }
+    }
+
+    private static class PcapInstance {
+
+        private final PcapOutput pcap;
+
+        private int borrows;
+
+        public PcapInstance(PcapOutput pcap) {
+            this.pcap = pcap;
+        }
+    }
+}
--- a/stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorkerTest.java	Fri Apr 24 12:32:45 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,109 +0,0 @@
-package com.passus.st.emitter.pcap;
-
-import com.passus.net.MACAddress;
-import com.passus.st.client.TestClientHandler;
-import com.passus.st.client.TestClientHandler.ClientEvent;
-import com.passus.st.emitter.Emitter;
-import com.passus.st.emitter.MapBasedMACAddressResolver;
-import com.passus.st.emitter.PortPool;
-import com.passus.st.emitter.SessionInfo;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import java.text.ParseException;
-import java.util.Iterator;
-
-import static com.passus.st.client.TestClientHandler.EventType.*;
-import static org.testng.Assert.assertTrue;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertFalse;
-
-public class UnidirectionalPcapWorkerTest {
-
-    private final MapBasedMACAddressResolver macResolver = new MapBasedMACAddressResolver();
-
-    private final TestPortPool portPool = new TestPortPool();
-
-    @BeforeClass
-    public void beforeClass() throws ParseException {
-        macResolver.add("1.1.1.1 -> 11:11:11:11:11:11");
-        macResolver.add("2.2.2.2 -> 22:22:22:22:22:22");
-    }
-
-    private void startWorker(UnidirectionalPcapWorker worker) {
-        worker.start();
-        try {
-            Thread.sleep(200);
-        } catch (InterruptedException e) {
-
-        }
-    }
-
-    private void stopWorker(UnidirectionalPcapWorker worker) {
-        worker.working = false;
-        worker.interrupt();
-        try {
-            worker.join();
-        } catch (InterruptedException ignore) {
-
-        }
-    }
-
-    @Test
-    public void testConnect() throws Exception {
-        UnidirectionalPcapWorker worker = new UnidirectionalPcapWorker();
-        try {
-            worker.setMacResolver(macResolver);
-            worker.setPortPool(portPool);
-            worker.setSessionMapper(Emitter.DEFAULT_SESSION_MAPPER);
-            startWorker(worker);
-
-            TestClientHandler handler = new TestClientHandler();
-
-            SessionInfo sessionInfo = SessionInfo.parse("5.5.5.5:1000 -> 1.1.1.1:80");
-            worker.connect(sessionInfo, handler);
-            handler.waitForEvent(CHANNEL_REGISTERED);
-            assertTrue(portPool.borrowed);
-
-            worker.close(sessionInfo);
-            handler.waitForEvent(CHANNEL_UNREGISTERED);
-
-            UnidirectionalPcapChannelContext channelContext = (UnidirectionalPcapChannelContext) handler.getChannelContext();
-            assertEquals(TestPortPool.PORT, channelContext.getLocalAddress().getPort());
-            assertEquals(new MACAddress("11:11:11:11:11:11"), channelContext.getRemoteHardwareAddress());
-
-            Iterator<ClientEvent> it = handler.events().iterator();
-            assertEquals(CHANNEL_REGISTERED, it.next().getType());
-            assertEquals(CHANNEL_ACTIVE, it.next().getType());
-            assertEquals(CHANNEL_INACTIVE, it.next().getType());
-            assertEquals(CHANNEL_UNREGISTERED, it.next().getType());
-            assertFalse(portPool.borrowed);
-        } finally {
-            stopWorker(worker);
-        }
-    }
-
-    static class TestPortPool implements PortPool {
-
-        public static final int PORT = 100;
-
-        boolean borrowed = false;
-
-        @Override
-        public int borrow() {
-            if (borrowed) {
-                return -1;
-            }
-
-            borrowed = true;
-            return PORT;
-        }
-
-        @Override
-        public void release(int port) {
-            if (borrowed) {
-                borrowed = false;
-            }
-        }
-    }
-}
\ No newline at end of file
--- a/stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitterTest.java	Fri Apr 24 12:32:45 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,90 +0,0 @@
-package com.passus.st.emitter.pcap;
-
-import com.passus.data.HeapByteBuff;
-import com.passus.net.session.Session;
-import com.passus.st.AbstractWireMockTest;
-import com.passus.st.client.TestClientHandler;
-import com.passus.st.emitter.AbstractEmitterTest;
-import com.passus.st.emitter.ChannelContext;
-import com.passus.st.emitter.MapBasedMACAddressResolver;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.emitter.SessionMapper;
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-public class UnidirectionalUdpPcapEmitterTest extends AbstractWireMockTest {
-
-    public UnidirectionalUdpPcapEmitter createEmitter(SessionMapper mapper) throws Exception {
-
-        MapBasedMACAddressResolver macResolver = new MapBasedMACAddressResolver();
-        macResolver.add("* -> 00:00:00:00:00:00");
-
-        UnidirectionalUdpPcapEmitter emitter = new UnidirectionalUdpPcapEmitter();
-        emitter.setWorkersNum(1);
-        emitter.setMacResolver(macResolver);
-        if (mapper != null) {
-            emitter.setSessionMapper(mapper);
-        }
-        return emitter;
-    }
-
-    @Test
-    public void testConnectAndClose() throws Exception {
-        UnidirectionalUdpPcapEmitter emitter = createEmitter(null);
-        try {
-            emitter.start();
-            SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, port(), Session.PROTOCOL_UDP);
-
-            TestClientHandler handler = new TestClientHandler() {
-                @Override
-                protected void doChannelActive(ChannelContext context) throws Exception {
-                    context.close();
-                }
-            };
-
-            emitter.connect(info, handler, 0);
-            AbstractEmitterTest.waitConn(handler, 4);
-
-            AssertJUnit.assertEquals(4, handler.size());
-            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_REGISTERED, handler.get(0).getType());
-            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_ACTIVE, handler.get(1).getType());
-            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_INACTIVE, handler.get(2).getType());
-            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_UNREGISTERED, handler.get(3).getType());
-        } finally {
-            emitter.stop();
-        }
-    }
-
-    @Test
-    public void testSend() throws Exception {
-        final byte[] ETH = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0};
-
-        UnidirectionalUdpPcapEmitter emitter = createEmitter(null);
-        emitter.start();
-        try {
-            SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, port(), Session.PROTOCOL_UDP);
-
-            TestClientHandler handler = new TestClientHandler() {
-
-                @Override
-                protected void doChannelActive(ChannelContext context) throws Exception {
-                    context.writeAndFlush(new HeapByteBuff(ETH));
-                    context.close();
-                }
-            };
-
-            emitter.connect(info, handler, 0);
-            AbstractEmitterTest.waitConn(handler, 5, 5_000);
-
-            AssertJUnit.assertEquals(5, handler.size());
-            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_REGISTERED, handler.get(0).getType());
-            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_ACTIVE, handler.get(1).getType());
-            AssertJUnit.assertEquals(TestClientHandler.EventType.DATA_WRITTEN, handler.get(2).getType());
-            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_INACTIVE, handler.get(3).getType());
-            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_UNREGISTERED, handler.get(4).getType());
-        } finally {
-            emitter.stop();
-        }
-    }
-
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketEmitterTest.java	Mon Apr 27 12:36:42 2020 +0200
@@ -0,0 +1,91 @@
+package com.passus.st.emitter.raw;
+
+import com.passus.data.HeapByteBuff;
+import com.passus.net.session.Session;
+import com.passus.st.AbstractWireMockTest;
+import com.passus.st.client.TestClientHandler;
+import com.passus.st.emitter.AbstractEmitterTest;
+import com.passus.st.emitter.ChannelContext;
+import com.passus.st.emitter.MapBasedMACAddressResolver;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.emitter.SessionMapper;
+import com.passus.st.emitter.raw.PcapUnidirectionalRawPacketEmitter;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+public class PcapUnidirectionalRawPacketEmitterTest extends AbstractWireMockTest {
+
+    public PcapUnidirectionalRawPacketEmitter createEmitter(SessionMapper mapper) throws Exception {
+
+        MapBasedMACAddressResolver macResolver = new MapBasedMACAddressResolver();
+        macResolver.add("* -> 00:00:00:00:00:00");
+
+        PcapUnidirectionalRawPacketEmitter emitter = new PcapUnidirectionalRawPacketEmitter();
+        emitter.setWorkersNum(1);
+        emitter.setMacResolver(macResolver);
+        if (mapper != null) {
+            emitter.setSessionMapper(mapper);
+        }
+        return emitter;
+    }
+
+    @Test
+    public void testConnectAndClose() throws Exception {
+        PcapUnidirectionalRawPacketEmitter emitter = createEmitter(null);
+        try {
+            emitter.start();
+            SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, port(), Session.PROTOCOL_UDP);
+
+            TestClientHandler handler = new TestClientHandler() {
+                @Override
+                protected void doChannelActive(ChannelContext context) throws Exception {
+                    context.close();
+                }
+            };
+
+            emitter.connect(info, handler, 0);
+            AbstractEmitterTest.waitConn(handler, 4);
+
+            AssertJUnit.assertEquals(4, handler.size());
+            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_REGISTERED, handler.get(0).getType());
+            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_ACTIVE, handler.get(1).getType());
+            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_INACTIVE, handler.get(2).getType());
+            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_UNREGISTERED, handler.get(3).getType());
+        } finally {
+            emitter.stop();
+        }
+    }
+
+    @Test
+    public void testSend() throws Exception {
+        final byte[] ETH = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0};
+
+        PcapUnidirectionalRawPacketEmitter emitter = createEmitter(null);
+        emitter.start();
+        try {
+            SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, port(), Session.PROTOCOL_UDP);
+
+            TestClientHandler handler = new TestClientHandler() {
+
+                @Override
+                protected void doChannelActive(ChannelContext context) throws Exception {
+                    context.writeAndFlush(new HeapByteBuff(ETH));
+                    context.close();
+                }
+            };
+
+            emitter.connect(info, handler, 0);
+            AbstractEmitterTest.waitConn(handler, 5, 5_000);
+
+            AssertJUnit.assertEquals(5, handler.size());
+            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_REGISTERED, handler.get(0).getType());
+            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_ACTIVE, handler.get(1).getType());
+            AssertJUnit.assertEquals(TestClientHandler.EventType.DATA_WRITTEN, handler.get(2).getType());
+            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_INACTIVE, handler.get(3).getType());
+            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_UNREGISTERED, handler.get(4).getType());
+        } finally {
+            emitter.stop();
+        }
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketWorkerTest.java	Mon Apr 27 12:36:42 2020 +0200
@@ -0,0 +1,109 @@
+package com.passus.st.emitter.raw;
+
+import com.passus.net.MACAddress;
+import com.passus.st.client.TestClientHandler;
+import com.passus.st.client.TestClientHandler.ClientEvent;
+import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.MapBasedMACAddressResolver;
+import com.passus.st.emitter.PortPool;
+import com.passus.st.emitter.SessionInfo;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.text.ParseException;
+import java.util.Iterator;
+
+import static com.passus.st.client.TestClientHandler.EventType.*;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+
+public class PcapUnidirectionalRawPacketWorkerTest {
+
+    private final MapBasedMACAddressResolver macResolver = new MapBasedMACAddressResolver();
+
+    private final TestPortPool portPool = new TestPortPool();
+
+    @BeforeClass
+    public void beforeClass() throws ParseException {
+        macResolver.add("1.1.1.1 -> 11:11:11:11:11:11");
+        macResolver.add("2.2.2.2 -> 22:22:22:22:22:22");
+    }
+
+    private void startWorker(PcapUnidirectionalRawPacketWorker worker) {
+        worker.start();
+        try {
+            Thread.sleep(200);
+        } catch (InterruptedException e) {
+
+        }
+    }
+
+    private void stopWorker(PcapUnidirectionalRawPacketWorker worker) {
+        worker.working = false;
+        worker.interrupt();
+        try {
+            worker.join();
+        } catch (InterruptedException ignore) {
+
+        }
+    }
+
+    @Test
+    public void testConnect() throws Exception {
+        PcapUnidirectionalRawPacketWorker worker = new PcapUnidirectionalRawPacketWorker();
+        try {
+            worker.setMacResolver(macResolver);
+            worker.setPortPool(portPool);
+            worker.setSessionMapper(Emitter.DEFAULT_SESSION_MAPPER);
+            startWorker(worker);
+
+            TestClientHandler handler = new TestClientHandler();
+
+            SessionInfo sessionInfo = SessionInfo.parse("5.5.5.5:1000 -> 1.1.1.1:80");
+            worker.connect(sessionInfo, handler);
+            handler.waitForEvent(CHANNEL_REGISTERED);
+            assertTrue(portPool.borrowed);
+
+            worker.close(sessionInfo);
+            handler.waitForEvent(CHANNEL_UNREGISTERED);
+
+            UnidirectionalRawPacketChannelContext channelContext = (UnidirectionalRawPacketChannelContext) handler.getChannelContext();
+            assertEquals(TestPortPool.PORT, channelContext.getLocalAddress().getPort());
+            assertEquals(new MACAddress("11:11:11:11:11:11"), channelContext.getRemoteHardwareAddress());
+
+            Iterator<ClientEvent> it = handler.events().iterator();
+            assertEquals(CHANNEL_REGISTERED, it.next().getType());
+            assertEquals(CHANNEL_ACTIVE, it.next().getType());
+            assertEquals(CHANNEL_INACTIVE, it.next().getType());
+            assertEquals(CHANNEL_UNREGISTERED, it.next().getType());
+            assertFalse(portPool.borrowed);
+        } finally {
+            stopWorker(worker);
+        }
+    }
+
+    static class TestPortPool implements PortPool {
+
+        public static final int PORT = 100;
+
+        boolean borrowed = false;
+
+        @Override
+        public int borrow() {
+            if (borrowed) {
+                return -1;
+            }
+
+            borrowed = true;
+            return PORT;
+        }
+
+        @Override
+        public void release(int port) {
+            if (borrowed) {
+                borrowed = false;
+            }
+        }
+    }
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/utils/server/NioServer.java	Mon Apr 27 12:36:42 2020 +0200
@@ -0,0 +1,187 @@
+package com.passus.st.utils.server;
+
+import com.passus.commons.service.Service;
+import com.passus.commons.service.ServiceException;
+import com.passus.st.utils.NioConnectionsCounter;
+import com.passus.st.utils.TestSslUtils;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.jboss.netty.util.internal.SystemPropertyUtil;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManagerFactory;
+
+public class NioServer implements Service {
+
+    private static final Logger LOGGER = LogManager.getLogger(NioServer.class);
+
+    private String address = "localhost";
+
+    private int port = 5000;
+
+    private boolean ssl = false;
+
+    private ChannelFuture channel;
+
+    private EventLoopGroup masterGroup;
+
+    private EventLoopGroup slaveGroup;
+
+    private final NioConnectionsCounter connectionsCounter = new NioConnectionsCounter();
+
+    private ChannelHandler serverHandler;
+
+    private boolean started;
+
+    public String getAddress() {
+        return address;
+    }
+
+    public void setAddress(String address) {
+        this.address = address;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public boolean isSsl() {
+        return ssl;
+    }
+
+    public void setSsl(boolean ssl) {
+        this.ssl = ssl;
+    }
+
+    public ChannelHandler getServerHandler() {
+        return serverHandler;
+    }
+
+    public void setServerHandler(ChannelHandler serverHandler) {
+        this.serverHandler = serverHandler;
+    }
+
+    public int getConnections() {
+        return connectionsCounter.getConnections();
+    }
+
+    public int getMaxConnections() {
+        return connectionsCounter.getMaxConnections();
+    }
+
+    public void setMaxConnections(int maxConnections) {
+        connectionsCounter.setMaxConnections(maxConnections);
+    }
+
+    @Override
+    public boolean isStarted() {
+        return started;
+    }
+
+    private SSLContext createSSLContext() {
+        String algorithm = SystemPropertyUtil.get("ssl.KeyManagerFactory.algorithm");
+        if (algorithm == null) {
+            algorithm = "SunX509";
+        }
+
+        SSLContext serverContext;
+        try {
+            TrustManagerFactory trustManagerFactory = TestSslUtils.loadTrustManagerFactory();
+
+
+            // Initialize the SSLContext to work with our key managers.
+            serverContext = SSLContext.getInstance("TLS");
+            //serverContext.init(trustManagerFactory.get(), null, null);
+        } catch (Exception e) {
+            throw new Error(
+                    "Failed to initialize the server-side SSLContext", e);
+        }
+
+        return serverContext;
+    }
+
+    @Override
+    public void start() {
+        if (started) {
+            return;
+        }
+
+        try {
+            masterGroup = new NioEventLoopGroup();
+            slaveGroup = new NioEventLoopGroup();
+            ServerBootstrap bs = new ServerBootstrap();
+            bs.option(ChannelOption.SO_BACKLOG, 1024);
+
+            bs.group(masterGroup, slaveGroup)
+                    .channel(NioServerSocketChannel.class)
+                    .childHandler(new ChannelInitializer<SocketChannel>() {
+
+                        @Override
+                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+                            LOGGER.warn("Failed to initialize a channel. Closing: " + ctx.channel(), cause);
+                            ctx.close();
+                        }
+
+                        @Override
+                        protected void initChannel(SocketChannel ch) throws Exception {
+                            ChannelPipeline p = ch.pipeline();
+
+                            if (ssl) {
+                                SSLEngine engine = createSSLContext().createSSLEngine();
+                                engine.setUseClientMode(false);
+                                engine.setNeedClientAuth(false);
+                                p.addLast("ssl", new SslHandler(engine));
+                            }
+
+                            p.addLast(connectionsCounter);
+                            p.addLast(serverHandler);
+                        }
+
+                    });
+
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Starting Netty server {}:{}.", address, port);
+            }
+
+            channel = bs.bind(address, port);
+            this.started = true;
+        } catch (Exception ex) {
+            LOGGER.error(ex.getMessage(), ex);
+            stop();
+            throw new ServiceException(ex.getMessage(), ex);
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (!started) {
+            return;
+        }
+
+        slaveGroup.shutdownGracefully();
+        masterGroup.shutdownGracefully();
+
+        try {
+            LOGGER.debug("Stopping Netty server {}:{}.", address, port);
+            channel.channel().closeFuture().sync();
+        } catch (InterruptedException ignore) {
+        }
+
+        slaveGroup = null;
+        masterGroup = null;
+        channel = null;
+
+        started = false;
+    }
+}