Mercurial > stress-tester
changeset 1066:bea08c5fe560
UnidirectionalRawPacketChannelContext, UnidirectionalRawPacketEmitter, UnidirectionalRawPacketWorker
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/UnidirectionalTasks.java Mon Apr 27 12:36:42 2020 +0200 @@ -0,0 +1,41 @@ +package com.passus.st.emitter; + +public class UnidirectionalTasks { + + public static class UnidirectionalTask extends Task { + + public final SessionInfo sessionInfo; + + public UnidirectionalTask(int code, SessionInfo sessionInfo) { + super(code); + this.sessionInfo = sessionInfo; + } + } + + public final static class ConnectTask extends UnidirectionalTask { + + public final EmitterHandler handler; + + public ConnectTask(SessionInfo sessionInfo, final EmitterHandler handler) { + super(CONNECT, sessionInfo); + this.handler = handler; + } + + } + + public final static class CloseTask extends UnidirectionalTask { + + public CloseTask(SessionInfo sessionInfo) { + super(CLOSE, sessionInfo); + } + + } + + public final static class FlushTask extends UnidirectionalTask { + + public FlushTask(SessionInfo sessionInfo) { + super(FLUSH, sessionInfo); + } + + } +}
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/PcapOutput.java Fri Apr 24 12:32:45 2020 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,78 +0,0 @@ -package com.passus.st.emitter.pcap; - -import com.passus.pcap.Pcap; -import com.passus.pcap.PcapDumper; -import com.passus.pcap.PcapLinkType; - -/** - * - * @author mikolaj.podbielski - */ -public interface PcapOutput { - - Pcap pcap(); - - int sendPacket(int length, byte[] payload); - - void close(); - - public static class Sender implements PcapOutput { - - private final Pcap pcap; - - public Sender(Pcap pcap) { - this.pcap = pcap; - } - - @Override - public Pcap pcap() { - return pcap; - } - - @Override - public int sendPacket(int length, byte[] payload) { - return pcap.sendPacket(length, payload); - } - - @Override - public void close() { - pcap.close(); - } - } - - public static class Writer implements PcapOutput { - - private final Pcap pcap; - private final PcapDumper dumper; - - public Writer(Pcap pcap, PcapDumper dumper) { - this.pcap = pcap; - this.dumper = dumper; - } - - @Override - public Pcap pcap() { - return pcap; - } - - @Override - public int sendPacket(int length, byte[] payload) { - dumper.dump(System.currentTimeMillis(), length, payload); - return 0; - } - - @Override - public void close() { - dumper.close(); - pcap.close(); - } - - } - - public static Writer writer(String file) { - Pcap pcap = Pcap.openDead(PcapLinkType.DLT_EN10MB, 65536); - StringBuilder sb = new StringBuilder(); - PcapDumper pd = PcapDumper.open(pcap, "", sb); - return new Writer(pcap, pd); - } -}
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java Fri Apr 24 12:32:45 2020 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,157 +0,0 @@ -package com.passus.st.emitter.pcap; - -import com.passus.data.ByteBuff; -import com.passus.net.MACAddress; -import com.passus.net.SocketAddress; -import com.passus.st.client.FlowContext; -import com.passus.st.emitter.ChannelContext; -import com.passus.st.emitter.EmitterHandler; -import com.passus.st.emitter.SessionInfo; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.Queue; - -public class UnidirectionalPcapChannelContext implements ChannelContext { - - private static final int DEFAULT_BUFFER_SIZE = 65 * 1024; - - private final UnidirectionalPcapWorker worker; - - private PcapOutput pcapOut; - - final Queue<byte[]> dataQueue; - - final byte[] buffer; - - private FlowContext flowContext; - - private final EmitterHandler handler; - - private SessionInfo sessionInfo; - - private final SocketAddress localAddress; - - private final SocketAddress remoteAddress; - - private final MACAddress localHardwareAddress; - - private final MACAddress remoteHardwareAddress; - - private String device; - - public UnidirectionalPcapChannelContext(UnidirectionalPcapWorker worker, - EmitterHandler handler, SessionInfo sessionInfo, - SocketAddress localAddress, SocketAddress remoteAddress, - MACAddress localHardwareAddress, MACAddress remoteHardwareAddress) { - this.worker = worker; - this.handler = handler; - this.dataQueue = new LinkedList<>(); - this.sessionInfo = sessionInfo; - this.localAddress = localAddress; - this.remoteAddress = remoteAddress; - this.localHardwareAddress = localHardwareAddress; - this.remoteHardwareAddress = remoteHardwareAddress; - this.buffer = new byte[DEFAULT_BUFFER_SIZE]; - } - - PcapOutput getPcapOut() { - return pcapOut; - } - - void setPcapOut(PcapOutput pcapOut) { - this.pcapOut = pcapOut; - } - - String getDevice() { - return device; - } - - void setDevice(String device) { - this.device = device; - } - - @Override - public SocketAddress getLocalAddress() { - return localAddress; - } - - @Override - public SocketAddress getRemoteAddress() { - return remoteAddress; - } - - @Override - public SessionInfo getSessionInfo() { - return sessionInfo; - } - - public EmitterHandler getHandler() { - return handler; - } - - public MACAddress getLocalHardwareAddress() { - return localHardwareAddress; - } - - public MACAddress getRemoteHardwareAddress() { - return remoteHardwareAddress; - } - - @Override - public void setFlowContext(FlowContext attachment) { - this.flowContext = attachment; - } - - @Override - public FlowContext getFlowContext() { - return flowContext; - } - - @Override - public boolean isBidirectional() { - return false; - } - - @Override - public void setBidirectional(boolean bidirectional) { - if (bidirectional) { - throw new IllegalArgumentException("Bidirectional communication not supported."); - } - } - - @Override - public boolean isConnected() { - return true; - } - - @Override - public boolean isConnectionPending() { - return false; - } - - - @Override - public void write(byte[] data, int offset, int length) throws IOException { - byte[] out = new byte[length]; - System.arraycopy(data, offset, out, 0, length); - dataQueue.add(out); - } - - @Override - public void write(ByteBuff data) throws IOException { - dataQueue.add(data.toArray()); - } - - @Override - public void flush() throws IOException { - worker.flush(sessionInfo); - } - - @Override - public void close() throws IOException { - worker.close(sessionInfo); - } - - -}
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java Fri Apr 24 12:32:45 2020 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,401 +0,0 @@ -package com.passus.st.emitter.pcap; - -import com.passus.commons.Assert; -import com.passus.net.IpAddress; -import com.passus.net.MACAddress; -import com.passus.net.SocketAddress; -import com.passus.pcap.Pcap; -import com.passus.pcap.PcapIfc; -import com.passus.st.emitter.*; -import com.passus.st.emitter.SessionMapper.ConnectionParams; -import com.passus.st.emitter.pcap.UnidirectionalTasks.CloseTask; -import com.passus.st.emitter.pcap.UnidirectionalTasks.ConnectTask; -import com.passus.st.emitter.pcap.UnidirectionalTasks.FlushTask; -import com.passus.st.emitter.pcap.UnidirectionalTasks.UnidirectionalTask; -import com.passus.st.metric.MetricSource; -import com.passus.st.metric.MetricsContainer; -import org.apache.commons.lang3.SystemUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.SocketException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import static com.passus.net.utils.AddressUtils.jdkInetAddresToIpAddress; -import static com.passus.st.emitter.EmitterUtils.getConnectionParams; -import static com.passus.st.emitter.SessionMapper.ANY_SOCKET; - -class UnidirectionalPcapWorker extends Thread implements MetricSource { - - protected static final byte[] ZERO_MAC = {0, 0, 0, 0, 0, 0}; - - protected final Logger LOGGER = LogManager.getLogger(UnidirectionalPcapWorker.class); - - boolean working = true; - - protected final BlockingQueue<UnidirectionalTask> tasks = new LinkedBlockingQueue<>(); - - private Map<SessionInfo, UnidirectionalPcapChannelContext> sessions = new HashMap<>(); - - private SessionMapper sessionMapper; - - private volatile boolean collectMetrics = false; - - private EmitterMetric metric; - - private PortPool portPool = new PortPoolImpl(); - - private MACAddressResolver macResolver; - - private final Map<String, PcapInstance> pcaps = new HashMap<>(); - - public PortPool getPortPool() { - return portPool; - } - - public void setPortPool(PortPool portPool) { - Assert.notNull(portPool, "portPool"); - this.portPool = portPool; - } - - public MACAddressResolver getMacResolver() { - return macResolver; - } - - public void setMacResolver(MACAddressResolver macResolver) { - Assert.notNull(macResolver, "macResolver"); - this.macResolver = macResolver; - } - - public SessionMapper getSessionMapper() { - return sessionMapper; - } - - public void setSessionMapper(SessionMapper sessionMapper) { - Assert.notNull(sessionMapper, "sessionMapper"); - this.sessionMapper = sessionMapper; - } - - @Override - public boolean isCollectMetrics() { - return collectMetrics; - } - - @Override - public void setCollectMetrics(boolean collectMetrics) { - this.collectMetrics = collectMetrics; - } - - @Override - public void writeMetrics(MetricsContainer container) { - if (collectMetrics) { - container.update(System.currentTimeMillis(), metric); - metric.reset(); - } - } - - private void doCatchException(UnidirectionalPcapChannelContext channelContext, Throwable cause) { - EmitterHandler handler = channelContext == null ? null : channelContext.getHandler(); - EmitterUtils.doCatchException(channelContext, cause, handler, collectMetrics, metric, LOGGER); - } - - private NetworkInterface findFirstUpInterface() throws IOException { - NetworkInterface ni = EmitterUtils.findFirstUpNetworkInterface(); - if (ni == null) { - throw new IOException("Unable to find network interface."); - } - - return ni; - } - - private NetworkInterface findInterface(IpAddress address) throws IOException { - NetworkInterface ni = EmitterUtils.findNetworkInterface(address); - if (ni == null) { - throw new IOException("Unable to find network interface."); - } - - return ni; - } - - boolean containsContext(SessionInfo sessionInfo) { - return sessions.containsKey(sessionInfo); - } - - private UnidirectionalPcapChannelContext findContext(SessionInfo sessionInfo) { - UnidirectionalPcapChannelContext context = sessions.get(sessionInfo); - if (context == null) { - throw new IllegalArgumentException("Unable to find context for session: " + sessionInfo); - } - - return context; - } - - private int nextFreeLocalPort() throws IOException { - int port = portPool.borrow(); - if (port == -1) { - throw new IOException("Bind error. Unable to get local port."); - } - - return port; - } - - void connect(SessionInfo sessionInfo, EmitterHandler handler) { - tasks.add(new ConnectTask(sessionInfo, handler)); - } - - void flush(SessionInfo sessionInfo) { - tasks.add(new FlushTask(sessionInfo)); - } - - void close(SessionInfo sessionInfo) { - tasks.add(new CloseTask(sessionInfo)); - } - - private String getPcapDeviceName(NetworkInterface ifc) throws SocketException { - //Windows workaround - if (SystemUtils.IS_OS_WINDOWS) { - byte[] ifcMac = ifc.getHardwareAddress(); - StringBuilder sb = new StringBuilder(); - PcapIfc[] pcapIfcs = Pcap.listAvailableDevices(sb); - for (int i = 0; i < pcapIfcs.length; i++) { - PcapIfc pcapIfc = pcapIfcs[i]; - byte[] pcapIfcMac = Pcap.getHardwareAddress(pcapIfc.name); - if (Arrays.equals(ifcMac, pcapIfcMac)) { - return pcapIfc.name; - } - } - } - - return ifc.getName(); - } - - private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) { - ConnectionParams connParams = getConnectionParams(sessionInfo, handler, sessionMapper, collectMetrics, metric, LOGGER); - if (connParams == null) { - return; - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); - } - - UnidirectionalPcapChannelContext channelContext = null; - - String device; - NetworkInterface networkInterface; - SocketAddress localAddress = connParams.getBindAddress(); - MACAddress localMac; - try { - if (localAddress == null || ANY_SOCKET.equals(localAddress)) { - networkInterface = findFirstUpInterface(); - InetAddress inetAddress = networkInterface.getInetAddresses().nextElement(); - int port = nextFreeLocalPort(); - localAddress = new SocketAddress(jdkInetAddresToIpAddress(inetAddress), port); - } else { - networkInterface = findInterface(localAddress.getIp()); - } - - byte[] localHwAddress = networkInterface.getHardwareAddress(); - if (localHwAddress == null) { - localHwAddress = ZERO_MAC; - } - localMac = new MACAddress(localHwAddress); - device = getPcapDeviceName(networkInterface); - LOGGER.debug("Found device " + device + " for ifc " + networkInterface); - } catch (IOException ex) { - doCatchException(channelContext, ex); - return; - } - - SocketAddress remoteAddress = connParams.getRemoteAddress(); - if (remoteAddress == null) { - remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); - } - - MACAddress remoteMac = macResolver.resolve(remoteAddress.getIp()); - channelContext = new UnidirectionalPcapChannelContext(this, handler, sessionInfo, localAddress, remoteAddress, localMac, remoteMac); - sessions.put(sessionInfo, channelContext); - - try { - handler.channelRegistered(channelContext); - } catch (Exception ex) { - doCatchException(channelContext, ex); - } - - try { - PcapInstance pcapInstance = pcaps.get(device); - if (pcapInstance == null) { - StringBuilder sb = new StringBuilder(); - Pcap pcap = Pcap.openLive(device, sb); - if (pcap == null) { - doClose(sessionInfo); - throw new IOException("Unable to open pcap on device '" + device + ".' " + sb); - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Pcap instance for device {} created.", device); - } - - pcapInstance = new PcapInstance(new PcapOutput.Sender(pcap)); - pcaps.put(device, pcapInstance); - } - - channelContext.setPcapOut(pcapInstance.pcap); - channelContext.setDevice(device); - pcapInstance.borrows++; - - handler.channelActive(channelContext); - } catch (Exception ex) { - doCatchException(channelContext, ex); - } - } - - private void doWrite(SessionInfo sessionInfo) { - UnidirectionalPcapChannelContext channelContext = sessions.get(sessionInfo); - if (channelContext == null) { - LOGGER.debug("Unable to find context for session {}.", sessionInfo); - return; - } - - Queue<byte[]> queue = channelContext.dataQueue; - if (queue.isEmpty()) { - return; - } - - EmitterHandler handler = channelContext.getHandler(); - handler.dataWriteStart(channelContext); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Writing ({} -> {}).", channelContext.getLocalAddress(), channelContext.getRemoteAddress()); - } - - int written = 0; - try { - PcapOutput pcap = channelContext.getPcapOut(); - - while (!queue.isEmpty()) { - byte[] buf = queue.poll(); - int res = pcap.sendPacket(buf.length, buf); - if (res == 0) { - written += buf.length; - } else { - throw new IOException("Unable to send packet. Pcap error: " + pcap.pcap().getErr()); - } - } - } catch (Exception e) { - doCatchException(channelContext, e); - doClose(channelContext); - return; - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Written {}B ({} -> {})", written, channelContext.getLocalAddress(), channelContext.getRemoteAddress()); - } - - try { - handler.dataWritten(channelContext); - LOGGER.debug("Write handled."); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } - } - - private void doClose(SessionInfo sessionInfo) { - UnidirectionalPcapChannelContext channelContext = sessions.get(sessionInfo); - if (channelContext == null) { - LOGGER.debug("Unable to find context for session {}.", sessionInfo); - return; - } - - doClose(channelContext); - } - - private void doClose(UnidirectionalPcapChannelContext channelContext) { - SessionInfo sessionInfo = channelContext.getSessionInfo(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Closing session '" + sessionInfo + "'."); - } - - EmitterHandler handler = channelContext.getHandler(); - portPool.release(sessionInfo.getSrcPort()); - - String device = channelContext.getDevice(); - PcapInstance pcapInstance = pcaps.get(device); - if (pcapInstance != null) { - pcapInstance.borrows--; - if (pcapInstance.borrows == 0) { - pcapInstance.pcap.close(); - pcaps.remove(device); - } - } - - try { - handler.channelInactive(channelContext); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } - - sessions.remove(sessionInfo); - try { - handler.channelUnregistered(channelContext); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'."); - } - - if (collectMetrics) { - metric.incClosedConnections(); - } - } - - @Override - public void run() { - while (working) { - UnidirectionalTask task = null; - try { - task = tasks.take(); - } catch (InterruptedException ignore) { - - } - - if (task != null) { - switch (task.code) { - case Task.CONNECT: - ConnectTask connectTask = (ConnectTask) task; - doConnect(connectTask.sessionInfo, connectTask.handler); - break; - case Task.FLUSH: - FlushTask flushTask = (FlushTask) task; - doWrite(flushTask.sessionInfo); - break; - case Task.CLOSE: - CloseTask closeTask = (CloseTask) task; - doClose(closeTask.sessionInfo); - break; - } - } - - } - } - - private static class PcapInstance { - - private final PcapOutput pcap; - - private int borrows; - - public PcapInstance(PcapOutput pcap) { - this.pcap = pcap; - } - } -}
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalTasks.java Fri Apr 24 12:32:45 2020 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,45 +0,0 @@ -package com.passus.st.emitter.pcap; - -import com.passus.st.emitter.EmitterHandler; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.emitter.Task; - -public class UnidirectionalTasks { - - protected static class UnidirectionalTask extends Task { - - final SessionInfo sessionInfo; - - public UnidirectionalTask(int code, SessionInfo sessionInfo) { - super(code); - this.sessionInfo = sessionInfo; - } - } - - protected final static class ConnectTask extends UnidirectionalTask { - - final EmitterHandler handler; - - public ConnectTask(SessionInfo sessionInfo, final EmitterHandler handler) { - super(CONNECT, sessionInfo); - this.handler = handler; - } - - } - - protected final static class CloseTask extends UnidirectionalTask { - - public CloseTask(SessionInfo sessionInfo) { - super(CLOSE, sessionInfo); - } - - } - - protected final static class FlushTask extends UnidirectionalTask { - - public FlushTask(SessionInfo sessionInfo) { - super(FLUSH, sessionInfo); - } - - } -}
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitter.java Fri Apr 24 12:32:45 2020 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,136 +0,0 @@ -package com.passus.st.emitter.pcap; - -import com.passus.commons.Assert; -import com.passus.net.session.Session; -import com.passus.st.emitter.*; -import com.passus.st.metric.MetricsContainer; - -import java.io.IOException; - -public class UnidirectionalUdpPcapEmitter implements Emitter { - - private static final int DEFAULT_WORKERS_NUM = 4; - - private boolean started = false; - - private UnidirectionalPcapWorker[] workers; - - private int workersNum = DEFAULT_WORKERS_NUM; - - private PortPool portPool = new SynchronizedPortPoolWrapper(new PortPoolImpl()); - - private MACAddressResolver macResolver; - - private SessionMapper sessionMapper = Emitter.DEFAULT_SESSION_MAPPER; - - private boolean collectMetrics = false; - - @Override - public void setSessionMapper(SessionMapper sessionMapper) { - this.sessionMapper = sessionMapper; - } - - @Override - public SessionMapper getSessionMapper() { - return sessionMapper; - } - - public int getWorkersNum() { - return workersNum; - } - - public void setWorkersNum(int workersNum) { - Assert.greaterThanZero(workersNum, "workersNum"); - this.workersNum = workersNum; - } - - public MACAddressResolver getMacResolver() { - return macResolver; - } - - public void setMacResolver(MACAddressResolver macResolver) { - this.macResolver = macResolver; - } - - public PortPool getPortPool() { - return portPool; - } - - public void setPortPool(PortPool portPool) { - Assert.notNull(portPool, "portPool"); - this.portPool = new SynchronizedPortPoolWrapper(portPool); - } - - @Override - public boolean isCollectMetrics() { - return collectMetrics; - } - - @Override - public void setCollectMetrics(boolean collectMetrics) { - this.collectMetrics = collectMetrics; - } - - @Override - public void writeMetrics(MetricsContainer container) { - throw new RuntimeException("Not implemented yet."); - } - - @Override - public boolean isStarted() { - return started; - } - - @Override - public void start() { - if (started) { - return; - } - - workers = new UnidirectionalPcapWorker[workersNum]; - for (int i = 0; i < workersNum; i++) { - UnidirectionalPcapWorker worker = new UnidirectionalPcapWorker(); - worker.setPortPool(portPool); - worker.setMacResolver(macResolver); - worker.setSessionMapper(sessionMapper); - worker.start(); - workers[i] = worker; - } - - started = true; - } - - @Override - public void stop() { - if (!started) { - return; - } - - for (int i = 0; i < workersNum; i++) { - UnidirectionalPcapWorker worker = workers[i]; - worker.working = false; - worker.interrupt(); - - try { - worker.join(); - } catch (InterruptedException ignore) { - - } - } - - workers = null; - started = false; - } - - @Override - public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException { - if (sessionInfo.getTransport() != Session.PROTOCOL_UDP) { - throw new IllegalArgumentException("Only UDP transport supported."); - } - - int hashCode = (sessionInfo.hashCode() + workerIndex) & 0x7fffffff; - int index = hashCode % workers.length; - workers[index].connect(sessionInfo, handler); - } - -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/PcapOutput.java Mon Apr 27 12:36:42 2020 +0200 @@ -0,0 +1,78 @@ +package com.passus.st.emitter.raw; + +import com.passus.pcap.Pcap; +import com.passus.pcap.PcapDumper; +import com.passus.pcap.PcapLinkType; + +/** + * + * @author mikolaj.podbielski + */ +public interface PcapOutput { + + Pcap pcap(); + + int sendPacket(int length, byte[] payload); + + void close(); + + public static class Sender implements PcapOutput { + + private final Pcap pcap; + + public Sender(Pcap pcap) { + this.pcap = pcap; + } + + @Override + public Pcap pcap() { + return pcap; + } + + @Override + public int sendPacket(int length, byte[] payload) { + return pcap.sendPacket(length, payload); + } + + @Override + public void close() { + pcap.close(); + } + } + + public static class Writer implements PcapOutput { + + private final Pcap pcap; + private final PcapDumper dumper; + + public Writer(Pcap pcap, PcapDumper dumper) { + this.pcap = pcap; + this.dumper = dumper; + } + + @Override + public Pcap pcap() { + return pcap; + } + + @Override + public int sendPacket(int length, byte[] payload) { + dumper.dump(System.currentTimeMillis(), length, payload); + return 0; + } + + @Override + public void close() { + dumper.close(); + pcap.close(); + } + + } + + public static Writer writer(String file) { + Pcap pcap = Pcap.openDead(PcapLinkType.DLT_EN10MB, 65536); + StringBuilder sb = new StringBuilder(); + PcapDumper pd = PcapDumper.open(pcap, "", sb); + return new Writer(pcap, pd); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketEmitter.java Mon Apr 27 12:36:42 2020 +0200 @@ -0,0 +1,14 @@ +package com.passus.st.emitter.raw; + +public class PcapUnidirectionalRawPacketEmitter extends UnidirectionalRawPacketEmitter<PcapOutput> { + + @Override + protected UnidirectionalRawPacketWorker<PcapOutput>[] createWorkersArray(int workersNum) { + return new PcapUnidirectionalRawPacketWorker[workersNum]; + } + + @Override + protected UnidirectionalRawPacketWorker<PcapOutput> createWorker() { + return new PcapUnidirectionalRawPacketWorker(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketWorker.java Mon Apr 27 12:36:42 2020 +0200 @@ -0,0 +1,101 @@ +package com.passus.st.emitter.raw; + +import com.passus.pcap.Pcap; +import com.passus.pcap.PcapIfc; +import org.apache.commons.lang3.SystemUtils; + +import java.io.IOException; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; + +class PcapUnidirectionalRawPacketWorker extends UnidirectionalRawPacketWorker<PcapOutput> { + + private final Map<String, PcapInstance> pcaps = new HashMap<>(); + + protected String resolveDevice(NetworkInterface ifc) throws SocketException { + //Windows workaround + if (SystemUtils.IS_OS_WINDOWS) { + byte[] ifcMac = ifc.getHardwareAddress(); + StringBuilder sb = new StringBuilder(); + PcapIfc[] pcapIfcs = Pcap.listAvailableDevices(sb); + for (int i = 0; i < pcapIfcs.length; i++) { + PcapIfc pcapIfc = pcapIfcs[i]; + byte[] pcapIfcMac = Pcap.getHardwareAddress(pcapIfc.name); + if (Arrays.equals(ifcMac, pcapIfcMac)) { + return pcapIfc.name; + } + } + } + + return ifc.getName(); + } + + @Override + protected PcapOutput doInitEngine(UnidirectionalRawPacketChannelContext<PcapOutput> channelContext, String device) throws IOException { + PcapInstance pcapInstance = pcaps.get(device); + if (pcapInstance == null) { + StringBuilder sb = new StringBuilder(); + Pcap pcap = Pcap.openLive(device, sb); + if (pcap == null) { + doClose(channelContext.getSessionInfo()); + throw new IOException("Unable to open pcap on device '" + device + ".' " + sb); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Pcap instance for device {} created.", device); + } + + pcapInstance = new PcapInstance(new PcapOutput.Sender(pcap)); + pcaps.put(device, pcapInstance); + } + + return pcapInstance.pcap; + } + + @Override + protected int doWrite0(UnidirectionalRawPacketChannelContext<PcapOutput> channelContext) throws IOException { + PcapOutput pcap = channelContext.getEngine(); + Queue<byte[]> queue = channelContext.getDataQueue(); + int written = 0; + while (!queue.isEmpty()) { + byte[] buf = queue.poll(); + int res = pcap.sendPacket(buf.length, buf); + if (res == 0) { + written += buf.length; + } else { + throw new IOException("Unable to send packet. Pcap error: " + pcap.pcap().getErr()); + } + } + + return written; + } + + @Override + protected void doClose0(UnidirectionalRawPacketChannelContext<PcapOutput> channelContext) { + String device = channelContext.getDevice(); + PcapInstance pcapInstance = pcaps.get(device); + if (pcapInstance != null) { + pcapInstance.borrows--; + if (pcapInstance.borrows == 0) { + pcapInstance.pcap.close(); + pcaps.remove(device); + } + } + } + + + private static class PcapInstance { + + private final PcapOutput pcap; + + private int borrows; + + public PcapInstance(PcapOutput pcap) { + this.pcap = pcap; + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketChannelContext.java Mon Apr 27 12:36:42 2020 +0200 @@ -0,0 +1,160 @@ +package com.passus.st.emitter.raw; + +import com.passus.data.ByteBuff; +import com.passus.net.MACAddress; +import com.passus.net.SocketAddress; +import com.passus.st.client.FlowContext; +import com.passus.st.emitter.ChannelContext; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.SessionInfo; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; + +public class UnidirectionalRawPacketChannelContext<E> implements ChannelContext { + + private static final int DEFAULT_BUFFER_SIZE = 65 * 1024; + + private final UnidirectionalRawPacketWorker<E> worker; + + private E engine; + + final Queue<byte[]> dataQueue; + + final byte[] buffer; + + private FlowContext flowContext; + + private final EmitterHandler handler; + + private SessionInfo sessionInfo; + + private final SocketAddress localAddress; + + private final SocketAddress remoteAddress; + + private final MACAddress localHardwareAddress; + + private final MACAddress remoteHardwareAddress; + + private String device; + + public UnidirectionalRawPacketChannelContext(UnidirectionalRawPacketWorker<E> worker, + EmitterHandler handler, SessionInfo sessionInfo, + SocketAddress localAddress, SocketAddress remoteAddress, + MACAddress localHardwareAddress, MACAddress remoteHardwareAddress) { + this.worker = worker; + this.handler = handler; + this.dataQueue = new LinkedList<>(); + this.sessionInfo = sessionInfo; + this.localAddress = localAddress; + this.remoteAddress = remoteAddress; + this.localHardwareAddress = localHardwareAddress; + this.remoteHardwareAddress = remoteHardwareAddress; + this.buffer = new byte[DEFAULT_BUFFER_SIZE]; + } + + public E getEngine() { + return engine; + } + + void setEngine(E engine) { + this.engine = engine; + } + + public String getDevice() { + return device; + } + + void setDevice(String device) { + this.device = device; + } + + public Queue<byte[]> getDataQueue() { + return dataQueue; + } + + @Override + public SocketAddress getLocalAddress() { + return localAddress; + } + + @Override + public SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public SessionInfo getSessionInfo() { + return sessionInfo; + } + + public EmitterHandler getHandler() { + return handler; + } + + public MACAddress getLocalHardwareAddress() { + return localHardwareAddress; + } + + public MACAddress getRemoteHardwareAddress() { + return remoteHardwareAddress; + } + + @Override + public void setFlowContext(FlowContext attachment) { + this.flowContext = attachment; + } + + @Override + public FlowContext getFlowContext() { + return flowContext; + } + + @Override + public boolean isBidirectional() { + return false; + } + + @Override + public void setBidirectional(boolean bidirectional) { + if (bidirectional) { + throw new IllegalArgumentException("Bidirectional communication not supported."); + } + } + + @Override + public boolean isConnected() { + return true; + } + + @Override + public boolean isConnectionPending() { + return false; + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + byte[] out = new byte[length]; + System.arraycopy(data, offset, out, 0, length); + dataQueue.add(out); + } + + @Override + public void write(ByteBuff data) throws IOException { + dataQueue.add(data.toArray()); + } + + @Override + public void flush() throws IOException { + worker.flush(sessionInfo); + } + + @Override + public void close() throws IOException { + worker.close(sessionInfo); + } + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java Mon Apr 27 12:36:42 2020 +0200 @@ -0,0 +1,140 @@ +package com.passus.st.emitter.raw; + +import com.passus.commons.Assert; +import com.passus.net.session.Session; +import com.passus.st.emitter.*; +import com.passus.st.metric.MetricsContainer; + +import java.io.IOException; + +public abstract class UnidirectionalRawPacketEmitter<E> implements Emitter { + + private static final int DEFAULT_WORKERS_NUM = 4; + + private boolean started = false; + + private UnidirectionalRawPacketWorker<E>[] workers; + + private int workersNum = DEFAULT_WORKERS_NUM; + + private PortPool portPool = new SynchronizedPortPoolWrapper(new PortPoolImpl()); + + private MACAddressResolver macResolver; + + private SessionMapper sessionMapper = Emitter.DEFAULT_SESSION_MAPPER; + + private boolean collectMetrics = false; + + @Override + public void setSessionMapper(SessionMapper sessionMapper) { + this.sessionMapper = sessionMapper; + } + + @Override + public SessionMapper getSessionMapper() { + return sessionMapper; + } + + public int getWorkersNum() { + return workersNum; + } + + public void setWorkersNum(int workersNum) { + Assert.greaterThanZero(workersNum, "workersNum"); + this.workersNum = workersNum; + } + + public MACAddressResolver getMacResolver() { + return macResolver; + } + + public void setMacResolver(MACAddressResolver macResolver) { + this.macResolver = macResolver; + } + + public PortPool getPortPool() { + return portPool; + } + + public void setPortPool(PortPool portPool) { + Assert.notNull(portPool, "portPool"); + this.portPool = new SynchronizedPortPoolWrapper(portPool); + } + + @Override + public boolean isCollectMetrics() { + return collectMetrics; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + this.collectMetrics = collectMetrics; + } + + @Override + public void writeMetrics(MetricsContainer container) { + throw new RuntimeException("Not implemented yet."); + } + + @Override + public boolean isStarted() { + return started; + } + + protected abstract UnidirectionalRawPacketWorker<E>[] createWorkersArray(int workersNum); + + protected abstract UnidirectionalRawPacketWorker<E> createWorker(); + + @Override + public void start() { + if (started) { + return; + } + + workers = createWorkersArray(workersNum); + for (int i = 0; i < workersNum; i++) { + UnidirectionalRawPacketWorker<E> worker = createWorker(); + worker.setPortPool(portPool); + worker.setMacResolver(macResolver); + worker.setSessionMapper(sessionMapper); + worker.start(); + workers[i] = worker; + } + + started = true; + } + + @Override + public void stop() { + if (!started) { + return; + } + + for (int i = 0; i < workersNum; i++) { + UnidirectionalRawPacketWorker<E> worker = workers[i]; + worker.working = false; + worker.interrupt(); + + try { + worker.join(); + } catch (InterruptedException ignore) { + + } + } + + workers = null; + started = false; + } + + @Override + public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException { + if (sessionInfo.getTransport() != Session.PROTOCOL_UDP) { + throw new IllegalArgumentException("Only UDP transport supported."); + } + + int hashCode = (sessionInfo.hashCode() + workerIndex) & 0x7fffffff; + int index = hashCode % workers.length; + workers[index].connect(sessionInfo, handler); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java Mon Apr 27 12:36:42 2020 +0200 @@ -0,0 +1,342 @@ +package com.passus.st.emitter.raw; + +import com.passus.commons.Assert; +import com.passus.net.IpAddress; +import com.passus.net.MACAddress; +import com.passus.net.SocketAddress; +import com.passus.st.emitter.*; +import com.passus.st.metric.MetricSource; +import com.passus.st.metric.MetricsContainer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static com.passus.net.utils.AddressUtils.jdkInetAddresToIpAddress; +import static com.passus.st.emitter.EmitterUtils.getConnectionParams; +import static com.passus.st.emitter.SessionMapper.ANY_SOCKET; + +public abstract class UnidirectionalRawPacketWorker<E> extends Thread implements MetricSource { + + protected static final byte[] ZERO_MAC = {0, 0, 0, 0, 0, 0}; + + protected final Logger LOGGER = LogManager.getLogger(UnidirectionalRawPacketWorker.class); + + boolean working = true; + + protected final BlockingQueue<UnidirectionalTasks.UnidirectionalTask> tasks = new LinkedBlockingQueue<>(); + + private Map<SessionInfo, UnidirectionalRawPacketChannelContext<E>> sessions = new HashMap<>(); + + private SessionMapper sessionMapper; + + private volatile boolean collectMetrics = false; + + private EmitterMetric metric; + + private PortPool portPool = new PortPoolImpl(); + + private MACAddressResolver macResolver; + + public PortPool getPortPool() { + return portPool; + } + + public void setPortPool(PortPool portPool) { + Assert.notNull(portPool, "portPool"); + this.portPool = portPool; + } + + public MACAddressResolver getMacResolver() { + return macResolver; + } + + public void setMacResolver(MACAddressResolver macResolver) { + Assert.notNull(macResolver, "macResolver"); + this.macResolver = macResolver; + } + + public SessionMapper getSessionMapper() { + return sessionMapper; + } + + public void setSessionMapper(SessionMapper sessionMapper) { + Assert.notNull(sessionMapper, "sessionMapper"); + this.sessionMapper = sessionMapper; + } + + @Override + public boolean isCollectMetrics() { + return collectMetrics; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + this.collectMetrics = collectMetrics; + } + + @Override + public void writeMetrics(MetricsContainer container) { + if (collectMetrics) { + container.update(System.currentTimeMillis(), metric); + metric.reset(); + } + } + + private void doCatchException(UnidirectionalRawPacketChannelContext<E> channelContext, Throwable cause) { + EmitterHandler handler = channelContext == null ? null : channelContext.getHandler(); + EmitterUtils.doCatchException(channelContext, cause, handler, collectMetrics, metric, LOGGER); + } + + private NetworkInterface findFirstUpInterface() throws IOException { + NetworkInterface ni = EmitterUtils.findFirstUpNetworkInterface(); + if (ni == null) { + throw new IOException("Unable to find network interface."); + } + + return ni; + } + + private NetworkInterface findInterface(IpAddress address) throws IOException { + NetworkInterface ni = EmitterUtils.findNetworkInterface(address); + if (ni == null) { + throw new IOException("Unable to find network interface."); + } + + return ni; + } + + boolean containsContext(SessionInfo sessionInfo) { + return sessions.containsKey(sessionInfo); + } + + private UnidirectionalRawPacketChannelContext<E> findContext(SessionInfo sessionInfo) { + UnidirectionalRawPacketChannelContext<E> context = sessions.get(sessionInfo); + if (context == null) { + throw new IllegalArgumentException("Unable to find context for session: " + sessionInfo); + } + + return context; + } + + private int nextFreeLocalPort() throws IOException { + int port = portPool.borrow(); + if (port == -1) { + throw new IOException("Bind error. Unable to get local port."); + } + + return port; + } + + void connect(SessionInfo sessionInfo, EmitterHandler handler) { + tasks.add(new UnidirectionalTasks.ConnectTask(sessionInfo, handler)); + } + + void flush(SessionInfo sessionInfo) { + tasks.add(new UnidirectionalTasks.FlushTask(sessionInfo)); + } + + void close(SessionInfo sessionInfo) { + tasks.add(new UnidirectionalTasks.CloseTask(sessionInfo)); + } + + protected abstract String resolveDevice(NetworkInterface networkInterface) throws IOException; + + protected abstract E doInitEngine(UnidirectionalRawPacketChannelContext<E> channelContext, String device) throws IOException; + + private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) { + SessionMapper.ConnectionParams connParams = getConnectionParams(sessionInfo, handler, sessionMapper, collectMetrics, metric, LOGGER); + if (connParams == null) { + return; + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); + } + + UnidirectionalRawPacketChannelContext<E> channelContext = null; + + String device; + NetworkInterface networkInterface; + SocketAddress localAddress = connParams.getBindAddress(); + MACAddress localMac; + try { + if (localAddress == null || ANY_SOCKET.equals(localAddress)) { + networkInterface = findFirstUpInterface(); + InetAddress inetAddress = networkInterface.getInetAddresses().nextElement(); + int port = nextFreeLocalPort(); + localAddress = new SocketAddress(jdkInetAddresToIpAddress(inetAddress), port); + } else { + networkInterface = findInterface(localAddress.getIp()); + } + + byte[] localHwAddress = networkInterface.getHardwareAddress(); + if (localHwAddress == null) { + localHwAddress = ZERO_MAC; + } + localMac = new MACAddress(localHwAddress); + device = resolveDevice(networkInterface); + LOGGER.debug("Found device " + device + " for ifc " + networkInterface); + } catch (IOException ex) { + doCatchException(channelContext, ex); + return; + } + + SocketAddress remoteAddress = connParams.getRemoteAddress(); + if (remoteAddress == null) { + remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); + } + + MACAddress remoteMac = macResolver.resolve(remoteAddress.getIp()); + channelContext = new UnidirectionalRawPacketChannelContext(this, handler, sessionInfo, localAddress, remoteAddress, localMac, remoteMac); + sessions.put(sessionInfo, channelContext); + + try { + handler.channelRegistered(channelContext); + } catch (Exception ex) { + doCatchException(channelContext, ex); + } + + try { + E engine = doInitEngine(channelContext, device); + channelContext.setEngine(engine); + channelContext.setDevice(device); + handler.channelActive(channelContext); + } catch (Exception ex) { + doCatchException(channelContext, ex); + } + } + + protected abstract int doWrite0(UnidirectionalRawPacketChannelContext<E> channelContext) throws IOException; + + private void doWrite(SessionInfo sessionInfo) { + UnidirectionalRawPacketChannelContext<E> channelContext = sessions.get(sessionInfo); + if (channelContext == null) { + LOGGER.debug("Unable to find context for session {}.", sessionInfo); + return; + } + + Queue<byte[]> queue = channelContext.dataQueue; + if (queue.isEmpty()) { + return; + } + + EmitterHandler handler = channelContext.getHandler(); + handler.dataWriteStart(channelContext); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Writing ({} -> {}).", channelContext.getLocalAddress(), channelContext.getRemoteAddress()); + } + + int written = 0; + try { + written = doWrite0(channelContext); + } catch (Exception e) { + doCatchException(channelContext, e); + doClose(channelContext); + return; + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Written {}B ({} -> {})", written, channelContext.getLocalAddress(), channelContext.getRemoteAddress()); + } + + try { + handler.dataWritten(channelContext); + LOGGER.debug("Write handled."); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + } + + protected void doClose(SessionInfo sessionInfo) { + UnidirectionalRawPacketChannelContext<E> channelContext = sessions.get(sessionInfo); + if (channelContext == null) { + LOGGER.debug("Unable to find context for session {}.", sessionInfo); + return; + } + + doClose(channelContext); + } + + protected abstract void doClose0(UnidirectionalRawPacketChannelContext<E> channelContext); + + private void doClose(UnidirectionalRawPacketChannelContext<E> channelContext) { + SessionInfo sessionInfo = channelContext.getSessionInfo(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Closing session '" + sessionInfo + "'."); + } + + EmitterHandler handler = channelContext.getHandler(); + portPool.release(sessionInfo.getSrcPort()); + + doClose0(channelContext); + try { + handler.channelInactive(channelContext); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + + sessions.remove(sessionInfo); + try { + handler.channelUnregistered(channelContext); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'."); + } + + if (collectMetrics) { + metric.incClosedConnections(); + } + } + + @Override + public void run() { + while (working) { + UnidirectionalTasks.UnidirectionalTask task = null; + try { + task = tasks.take(); + } catch (InterruptedException ignore) { + + } + + if (task != null) { + switch (task.code) { + case Task.CONNECT: + UnidirectionalTasks.ConnectTask connectTask = (UnidirectionalTasks.ConnectTask) task; + doConnect(connectTask.sessionInfo, connectTask.handler); + break; + case Task.FLUSH: + UnidirectionalTasks.FlushTask flushTask = (UnidirectionalTasks.FlushTask) task; + doWrite(flushTask.sessionInfo); + break; + case Task.CLOSE: + UnidirectionalTasks.CloseTask closeTask = (UnidirectionalTasks.CloseTask) task; + doClose(closeTask.sessionInfo); + break; + } + } + + } + } + + private static class PcapInstance { + + private final PcapOutput pcap; + + private int borrows; + + public PcapInstance(PcapOutput pcap) { + this.pcap = pcap; + } + } +}
--- a/stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorkerTest.java Fri Apr 24 12:32:45 2020 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,109 +0,0 @@ -package com.passus.st.emitter.pcap; - -import com.passus.net.MACAddress; -import com.passus.st.client.TestClientHandler; -import com.passus.st.client.TestClientHandler.ClientEvent; -import com.passus.st.emitter.Emitter; -import com.passus.st.emitter.MapBasedMACAddressResolver; -import com.passus.st.emitter.PortPool; -import com.passus.st.emitter.SessionInfo; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import java.text.ParseException; -import java.util.Iterator; - -import static com.passus.st.client.TestClientHandler.EventType.*; -import static org.testng.Assert.assertTrue; -import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertFalse; - -public class UnidirectionalPcapWorkerTest { - - private final MapBasedMACAddressResolver macResolver = new MapBasedMACAddressResolver(); - - private final TestPortPool portPool = new TestPortPool(); - - @BeforeClass - public void beforeClass() throws ParseException { - macResolver.add("1.1.1.1 -> 11:11:11:11:11:11"); - macResolver.add("2.2.2.2 -> 22:22:22:22:22:22"); - } - - private void startWorker(UnidirectionalPcapWorker worker) { - worker.start(); - try { - Thread.sleep(200); - } catch (InterruptedException e) { - - } - } - - private void stopWorker(UnidirectionalPcapWorker worker) { - worker.working = false; - worker.interrupt(); - try { - worker.join(); - } catch (InterruptedException ignore) { - - } - } - - @Test - public void testConnect() throws Exception { - UnidirectionalPcapWorker worker = new UnidirectionalPcapWorker(); - try { - worker.setMacResolver(macResolver); - worker.setPortPool(portPool); - worker.setSessionMapper(Emitter.DEFAULT_SESSION_MAPPER); - startWorker(worker); - - TestClientHandler handler = new TestClientHandler(); - - SessionInfo sessionInfo = SessionInfo.parse("5.5.5.5:1000 -> 1.1.1.1:80"); - worker.connect(sessionInfo, handler); - handler.waitForEvent(CHANNEL_REGISTERED); - assertTrue(portPool.borrowed); - - worker.close(sessionInfo); - handler.waitForEvent(CHANNEL_UNREGISTERED); - - UnidirectionalPcapChannelContext channelContext = (UnidirectionalPcapChannelContext) handler.getChannelContext(); - assertEquals(TestPortPool.PORT, channelContext.getLocalAddress().getPort()); - assertEquals(new MACAddress("11:11:11:11:11:11"), channelContext.getRemoteHardwareAddress()); - - Iterator<ClientEvent> it = handler.events().iterator(); - assertEquals(CHANNEL_REGISTERED, it.next().getType()); - assertEquals(CHANNEL_ACTIVE, it.next().getType()); - assertEquals(CHANNEL_INACTIVE, it.next().getType()); - assertEquals(CHANNEL_UNREGISTERED, it.next().getType()); - assertFalse(portPool.borrowed); - } finally { - stopWorker(worker); - } - } - - static class TestPortPool implements PortPool { - - public static final int PORT = 100; - - boolean borrowed = false; - - @Override - public int borrow() { - if (borrowed) { - return -1; - } - - borrowed = true; - return PORT; - } - - @Override - public void release(int port) { - if (borrowed) { - borrowed = false; - } - } - } -} \ No newline at end of file
--- a/stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitterTest.java Fri Apr 24 12:32:45 2020 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,90 +0,0 @@ -package com.passus.st.emitter.pcap; - -import com.passus.data.HeapByteBuff; -import com.passus.net.session.Session; -import com.passus.st.AbstractWireMockTest; -import com.passus.st.client.TestClientHandler; -import com.passus.st.emitter.AbstractEmitterTest; -import com.passus.st.emitter.ChannelContext; -import com.passus.st.emitter.MapBasedMACAddressResolver; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.emitter.SessionMapper; -import org.testng.AssertJUnit; -import org.testng.annotations.Test; - -public class UnidirectionalUdpPcapEmitterTest extends AbstractWireMockTest { - - public UnidirectionalUdpPcapEmitter createEmitter(SessionMapper mapper) throws Exception { - - MapBasedMACAddressResolver macResolver = new MapBasedMACAddressResolver(); - macResolver.add("* -> 00:00:00:00:00:00"); - - UnidirectionalUdpPcapEmitter emitter = new UnidirectionalUdpPcapEmitter(); - emitter.setWorkersNum(1); - emitter.setMacResolver(macResolver); - if (mapper != null) { - emitter.setSessionMapper(mapper); - } - return emitter; - } - - @Test - public void testConnectAndClose() throws Exception { - UnidirectionalUdpPcapEmitter emitter = createEmitter(null); - try { - emitter.start(); - SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, port(), Session.PROTOCOL_UDP); - - TestClientHandler handler = new TestClientHandler() { - @Override - protected void doChannelActive(ChannelContext context) throws Exception { - context.close(); - } - }; - - emitter.connect(info, handler, 0); - AbstractEmitterTest.waitConn(handler, 4); - - AssertJUnit.assertEquals(4, handler.size()); - AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_REGISTERED, handler.get(0).getType()); - AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_ACTIVE, handler.get(1).getType()); - AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_INACTIVE, handler.get(2).getType()); - AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_UNREGISTERED, handler.get(3).getType()); - } finally { - emitter.stop(); - } - } - - @Test - public void testSend() throws Exception { - final byte[] ETH = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0}; - - UnidirectionalUdpPcapEmitter emitter = createEmitter(null); - emitter.start(); - try { - SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, port(), Session.PROTOCOL_UDP); - - TestClientHandler handler = new TestClientHandler() { - - @Override - protected void doChannelActive(ChannelContext context) throws Exception { - context.writeAndFlush(new HeapByteBuff(ETH)); - context.close(); - } - }; - - emitter.connect(info, handler, 0); - AbstractEmitterTest.waitConn(handler, 5, 5_000); - - AssertJUnit.assertEquals(5, handler.size()); - AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_REGISTERED, handler.get(0).getType()); - AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_ACTIVE, handler.get(1).getType()); - AssertJUnit.assertEquals(TestClientHandler.EventType.DATA_WRITTEN, handler.get(2).getType()); - AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_INACTIVE, handler.get(3).getType()); - AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_UNREGISTERED, handler.get(4).getType()); - } finally { - emitter.stop(); - } - } - -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketEmitterTest.java Mon Apr 27 12:36:42 2020 +0200 @@ -0,0 +1,91 @@ +package com.passus.st.emitter.raw; + +import com.passus.data.HeapByteBuff; +import com.passus.net.session.Session; +import com.passus.st.AbstractWireMockTest; +import com.passus.st.client.TestClientHandler; +import com.passus.st.emitter.AbstractEmitterTest; +import com.passus.st.emitter.ChannelContext; +import com.passus.st.emitter.MapBasedMACAddressResolver; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.emitter.SessionMapper; +import com.passus.st.emitter.raw.PcapUnidirectionalRawPacketEmitter; +import org.testng.AssertJUnit; +import org.testng.annotations.Test; + +public class PcapUnidirectionalRawPacketEmitterTest extends AbstractWireMockTest { + + public PcapUnidirectionalRawPacketEmitter createEmitter(SessionMapper mapper) throws Exception { + + MapBasedMACAddressResolver macResolver = new MapBasedMACAddressResolver(); + macResolver.add("* -> 00:00:00:00:00:00"); + + PcapUnidirectionalRawPacketEmitter emitter = new PcapUnidirectionalRawPacketEmitter(); + emitter.setWorkersNum(1); + emitter.setMacResolver(macResolver); + if (mapper != null) { + emitter.setSessionMapper(mapper); + } + return emitter; + } + + @Test + public void testConnectAndClose() throws Exception { + PcapUnidirectionalRawPacketEmitter emitter = createEmitter(null); + try { + emitter.start(); + SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, port(), Session.PROTOCOL_UDP); + + TestClientHandler handler = new TestClientHandler() { + @Override + protected void doChannelActive(ChannelContext context) throws Exception { + context.close(); + } + }; + + emitter.connect(info, handler, 0); + AbstractEmitterTest.waitConn(handler, 4); + + AssertJUnit.assertEquals(4, handler.size()); + AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_REGISTERED, handler.get(0).getType()); + AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_ACTIVE, handler.get(1).getType()); + AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_INACTIVE, handler.get(2).getType()); + AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_UNREGISTERED, handler.get(3).getType()); + } finally { + emitter.stop(); + } + } + + @Test + public void testSend() throws Exception { + final byte[] ETH = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0}; + + PcapUnidirectionalRawPacketEmitter emitter = createEmitter(null); + emitter.start(); + try { + SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, port(), Session.PROTOCOL_UDP); + + TestClientHandler handler = new TestClientHandler() { + + @Override + protected void doChannelActive(ChannelContext context) throws Exception { + context.writeAndFlush(new HeapByteBuff(ETH)); + context.close(); + } + }; + + emitter.connect(info, handler, 0); + AbstractEmitterTest.waitConn(handler, 5, 5_000); + + AssertJUnit.assertEquals(5, handler.size()); + AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_REGISTERED, handler.get(0).getType()); + AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_ACTIVE, handler.get(1).getType()); + AssertJUnit.assertEquals(TestClientHandler.EventType.DATA_WRITTEN, handler.get(2).getType()); + AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_INACTIVE, handler.get(3).getType()); + AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_UNREGISTERED, handler.get(4).getType()); + } finally { + emitter.stop(); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketWorkerTest.java Mon Apr 27 12:36:42 2020 +0200 @@ -0,0 +1,109 @@ +package com.passus.st.emitter.raw; + +import com.passus.net.MACAddress; +import com.passus.st.client.TestClientHandler; +import com.passus.st.client.TestClientHandler.ClientEvent; +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.MapBasedMACAddressResolver; +import com.passus.st.emitter.PortPool; +import com.passus.st.emitter.SessionInfo; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.text.ParseException; +import java.util.Iterator; + +import static com.passus.st.client.TestClientHandler.EventType.*; +import static org.testng.Assert.assertTrue; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; + +public class PcapUnidirectionalRawPacketWorkerTest { + + private final MapBasedMACAddressResolver macResolver = new MapBasedMACAddressResolver(); + + private final TestPortPool portPool = new TestPortPool(); + + @BeforeClass + public void beforeClass() throws ParseException { + macResolver.add("1.1.1.1 -> 11:11:11:11:11:11"); + macResolver.add("2.2.2.2 -> 22:22:22:22:22:22"); + } + + private void startWorker(PcapUnidirectionalRawPacketWorker worker) { + worker.start(); + try { + Thread.sleep(200); + } catch (InterruptedException e) { + + } + } + + private void stopWorker(PcapUnidirectionalRawPacketWorker worker) { + worker.working = false; + worker.interrupt(); + try { + worker.join(); + } catch (InterruptedException ignore) { + + } + } + + @Test + public void testConnect() throws Exception { + PcapUnidirectionalRawPacketWorker worker = new PcapUnidirectionalRawPacketWorker(); + try { + worker.setMacResolver(macResolver); + worker.setPortPool(portPool); + worker.setSessionMapper(Emitter.DEFAULT_SESSION_MAPPER); + startWorker(worker); + + TestClientHandler handler = new TestClientHandler(); + + SessionInfo sessionInfo = SessionInfo.parse("5.5.5.5:1000 -> 1.1.1.1:80"); + worker.connect(sessionInfo, handler); + handler.waitForEvent(CHANNEL_REGISTERED); + assertTrue(portPool.borrowed); + + worker.close(sessionInfo); + handler.waitForEvent(CHANNEL_UNREGISTERED); + + UnidirectionalRawPacketChannelContext channelContext = (UnidirectionalRawPacketChannelContext) handler.getChannelContext(); + assertEquals(TestPortPool.PORT, channelContext.getLocalAddress().getPort()); + assertEquals(new MACAddress("11:11:11:11:11:11"), channelContext.getRemoteHardwareAddress()); + + Iterator<ClientEvent> it = handler.events().iterator(); + assertEquals(CHANNEL_REGISTERED, it.next().getType()); + assertEquals(CHANNEL_ACTIVE, it.next().getType()); + assertEquals(CHANNEL_INACTIVE, it.next().getType()); + assertEquals(CHANNEL_UNREGISTERED, it.next().getType()); + assertFalse(portPool.borrowed); + } finally { + stopWorker(worker); + } + } + + static class TestPortPool implements PortPool { + + public static final int PORT = 100; + + boolean borrowed = false; + + @Override + public int borrow() { + if (borrowed) { + return -1; + } + + borrowed = true; + return PORT; + } + + @Override + public void release(int port) { + if (borrowed) { + borrowed = false; + } + } + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/utils/server/NioServer.java Mon Apr 27 12:36:42 2020 +0200 @@ -0,0 +1,187 @@ +package com.passus.st.utils.server; + +import com.passus.commons.service.Service; +import com.passus.commons.service.ServiceException; +import com.passus.st.utils.NioConnectionsCounter; +import com.passus.st.utils.TestSslUtils; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.ssl.SslHandler; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jboss.netty.util.internal.SystemPropertyUtil; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.TrustManagerFactory; + +public class NioServer implements Service { + + private static final Logger LOGGER = LogManager.getLogger(NioServer.class); + + private String address = "localhost"; + + private int port = 5000; + + private boolean ssl = false; + + private ChannelFuture channel; + + private EventLoopGroup masterGroup; + + private EventLoopGroup slaveGroup; + + private final NioConnectionsCounter connectionsCounter = new NioConnectionsCounter(); + + private ChannelHandler serverHandler; + + private boolean started; + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public boolean isSsl() { + return ssl; + } + + public void setSsl(boolean ssl) { + this.ssl = ssl; + } + + public ChannelHandler getServerHandler() { + return serverHandler; + } + + public void setServerHandler(ChannelHandler serverHandler) { + this.serverHandler = serverHandler; + } + + public int getConnections() { + return connectionsCounter.getConnections(); + } + + public int getMaxConnections() { + return connectionsCounter.getMaxConnections(); + } + + public void setMaxConnections(int maxConnections) { + connectionsCounter.setMaxConnections(maxConnections); + } + + @Override + public boolean isStarted() { + return started; + } + + private SSLContext createSSLContext() { + String algorithm = SystemPropertyUtil.get("ssl.KeyManagerFactory.algorithm"); + if (algorithm == null) { + algorithm = "SunX509"; + } + + SSLContext serverContext; + try { + TrustManagerFactory trustManagerFactory = TestSslUtils.loadTrustManagerFactory(); + + + // Initialize the SSLContext to work with our key managers. + serverContext = SSLContext.getInstance("TLS"); + //serverContext.init(trustManagerFactory.get(), null, null); + } catch (Exception e) { + throw new Error( + "Failed to initialize the server-side SSLContext", e); + } + + return serverContext; + } + + @Override + public void start() { + if (started) { + return; + } + + try { + masterGroup = new NioEventLoopGroup(); + slaveGroup = new NioEventLoopGroup(); + ServerBootstrap bs = new ServerBootstrap(); + bs.option(ChannelOption.SO_BACKLOG, 1024); + + bs.group(masterGroup, slaveGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer<SocketChannel>() { + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + LOGGER.warn("Failed to initialize a channel. Closing: " + ctx.channel(), cause); + ctx.close(); + } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + + if (ssl) { + SSLEngine engine = createSSLContext().createSSLEngine(); + engine.setUseClientMode(false); + engine.setNeedClientAuth(false); + p.addLast("ssl", new SslHandler(engine)); + } + + p.addLast(connectionsCounter); + p.addLast(serverHandler); + } + + }); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Starting Netty server {}:{}.", address, port); + } + + channel = bs.bind(address, port); + this.started = true; + } catch (Exception ex) { + LOGGER.error(ex.getMessage(), ex); + stop(); + throw new ServiceException(ex.getMessage(), ex); + } + } + + @Override + public void stop() { + if (!started) { + return; + } + + slaveGroup.shutdownGracefully(); + masterGroup.shutdownGracefully(); + + try { + LOGGER.debug("Stopping Netty server {}:{}.", address, port); + channel.channel().closeFuture().sync(); + } catch (InterruptedException ignore) { + } + + slaveGroup = null; + masterGroup = null; + channel = null; + + started = false; + } +}