Mercurial > stress-tester
changeset 1001:7402a22cba53
UnidirectionalUdpPcapEmitter in progress
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/EmitterUtils.java Fri Oct 04 12:40:06 2019 +0200 @@ -0,0 +1,152 @@ +package com.passus.st.emitter; + +import com.passus.net.Ip4Address; +import com.passus.net.IpAddress; +import com.passus.net.IpSubnet; +import com.passus.net.utils.AddressUtils; +import com.passus.st.emitter.SessionMapper.ConnectionParams; +import com.passus.st.emitter.socket.AbstractChannelContext; +import org.apache.logging.log4j.Logger; + +import java.net.InetAddress; +import java.net.InterfaceAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; +import java.util.List; + +import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID; + +public class EmitterUtils { + + public static final String LOOPBACK_DEVICE_NAME = "lo"; + + private EmitterUtils() { + } + + public static final ConnectionParams getConnectionParams(SessionInfo sessionInfo, EmitterHandler handler, + SessionMapper sessionMapper, + boolean collectMetrics, EmitterMetric metric, + Logger logger) { + ConnectionParams connParams = sessionMapper.map(sessionInfo); + if (connParams == null) { + if (logger.isDebugEnabled()) { + logger.debug("Unable to map session '{}'.", sessionInfo); + } + + if (collectMetrics) { + synchronized (metric) { + metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID); + } + } + + try { + handler.sessionInvalidated(sessionInfo); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + return null; + } + + if (logger.isDebugEnabled()) { + logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); + } + + return connParams; + } + + public static void doCatchException(ChannelContext channelContext, Throwable cause, + EmitterHandler handler, + boolean collectMetrics, EmitterMetric metric, + Logger logger) { + if (logger.isDebugEnabled()) { + logger.debug("Error occurred. " + cause.getMessage(), cause); + } + + if (collectMetrics) { + synchronized (metric) { + metric.errorCaught(cause); + } + } + + try { + handler.errorOccurred(channelContext, cause); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + } + + public static NetworkInterface findFirstUpNetworkInterface() throws SocketException { + Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces(); + while (nets.hasMoreElements()) { + NetworkInterface ni = nets.nextElement(); + Enumeration<InetAddress> addrs = ni.getInetAddresses(); + while (addrs.hasMoreElements()) { + InetAddress addr = addrs.nextElement(); + if (addr.isLoopbackAddress() || addr.isLinkLocalAddress()) { + break; + } + + + return ni; + } + } + + return null; + } + + public static NetworkInterface findNetworkInterface(String address) throws SocketException { + return findNetworkInterface(IpAddress.parse(address)); + } + + public static NetworkInterface findNetworkInterface(IpAddress address) throws SocketException { + Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces(); + while (nets.hasMoreElements()) { + NetworkInterface ni = nets.nextElement(); + List<InterfaceAddress> addrs = ni.getInterfaceAddresses(); + for (InterfaceAddress ifcAddr : addrs) { + IpAddress nAddr = AddressUtils.getIpAddress(ifcAddr.getAddress().getAddress()); + if (nAddr.getVersion() == IpAddress.IP4) { + IpSubnet subnet = IpSubnet.fromCidr(nAddr.getAddress()[0], ifcAddr.getNetworkPrefixLength()); + if (subnet.inRange((Ip4Address) address)) { + return ni; + } + } + } + } + + return null; + } + + public static void displayInterfaceInformation(NetworkInterface ni) throws SocketException { + System.out.printf("Display name: %s\n", ni.getDisplayName()); + System.out.printf("Name: %s\n", ni.getName()); + List<InterfaceAddress> ifcAddresses = ni.getInterfaceAddresses(); + for (InterfaceAddress ifcAddress : ifcAddresses) { + if (ifcAddress.getAddress().isLinkLocalAddress()) { + continue; + } + + System.out.printf("IfcAddress: %s\n", ifcAddress); + } + + System.out.printf("\n"); + } + + public static void main(String[] args) throws SocketException { + NetworkInterface ni = findNetworkInterface("192.168.2.20"); + if (ni != null) { + displayInterfaceInformation(ni); + } else { + System.out.println("Not found."); + } + + ni = findFirstUpNetworkInterface(); + if (ni != null) { + displayInterfaceInformation(ni); + } else { + System.out.println("Not found."); + } + } +}
--- a/stress-tester/src/main/java/com/passus/st/emitter/StatelessTasks.java Thu Oct 03 14:49:05 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/StatelessTasks.java Fri Oct 04 12:40:06 2019 +0200 @@ -2,21 +2,6 @@ public class StatelessTasks { - protected static class Task { - - public static final int CONNECT = 1; - public static final int READ = 2; - public static final int FLUSH = 3; - public static final int CLOSE = 4; - - final int code; - - public Task(int code) { - this.code = code; - } - - } - public static final Task CONNECT_TASK = new Task(Task.CONNECT); public static final Task READ_TASK = new Task(Task.READ); public static final Task FLUSH_TASK = new Task(Task.FLUSH);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java Fri Oct 04 12:40:06 2019 +0200 @@ -0,0 +1,134 @@ +package com.passus.st.emitter.pcap; + +import com.passus.data.ByteBuff; +import com.passus.net.MACAddress; +import com.passus.net.SocketAddress; +import com.passus.st.emitter.ChannelContext; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.emitter.pcap.UnidirectionalTasks.CloseTask; +import com.passus.st.emitter.pcap.UnidirectionalTasks.FlushTask; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; + +public class UnidirectionalPcapChannelContext<K> implements ChannelContext<K> { + + private final UnidirectionalPcapWorker worker; + + private final Queue<byte[]> dataQueue; + + private K attachment; + + private final EmitterHandler handler; + + private SessionInfo sessionInfo; + + private final SocketAddress localAddress; + + private final SocketAddress remoteAddress; + + private final MACAddress localHardwareAddress; + + private final MACAddress remoteHardwareAddress; + + public UnidirectionalPcapChannelContext(UnidirectionalPcapWorker worker, + EmitterHandler handler, + SessionInfo sessionInfo, + SocketAddress localAddress, SocketAddress remoteAddress, + MACAddress localHardwareAddress, MACAddress remoteHardwareAddress) { + this.worker = worker; + this.handler = handler; + this.dataQueue = new LinkedList<>(); + this.sessionInfo = sessionInfo; + this.localAddress = localAddress; + this.remoteAddress = remoteAddress; + this.localHardwareAddress = localHardwareAddress; + this.remoteHardwareAddress = remoteHardwareAddress; + } + + @Override + public SocketAddress getLocalAddress() { + return localAddress; + } + + @Override + public SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public SessionInfo getSessionInfo() { + return sessionInfo; + } + + public EmitterHandler getHandler() { + return handler; + } + + public MACAddress getLocalHardwareAddress() { + return localHardwareAddress; + } + + public MACAddress getRemoteHardwareAddress() { + return remoteHardwareAddress; + } + + @Override + public void setAttachment(K attachment) { + this.attachment = attachment; + } + + @Override + public K getAttachment() { + return attachment; + } + + @Override + public boolean isBidirectional() { + return false; + } + + @Override + public void setBidirectional(boolean bidirectional) { + if (bidirectional) { + throw new IllegalArgumentException("Bidirectional communication not supported."); + } + } + + @Override + public boolean isConnected() { + return true; + } + + @Override + public boolean isConnectionPending() { + return false; + } + + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + byte[] out = new byte[length]; + System.arraycopy(data, offset, out, 0, length); + dataQueue.add(out); + } + + @Override + public void write(ByteBuff data) throws IOException { + dataQueue.add(data.toArray()); + } + + @Override + public void flush() throws IOException { + worker.flush(sessionInfo); + } + + @Override + public void close() throws IOException { + worker.close(sessionInfo); + } + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java Fri Oct 04 12:40:06 2019 +0200 @@ -0,0 +1,270 @@ +package com.passus.st.emitter.pcap; + +import com.passus.commons.Assert; +import com.passus.net.IpAddress; +import com.passus.net.MACAddress; +import com.passus.net.SocketAddress; +import com.passus.st.emitter.*; +import com.passus.st.emitter.SessionMapper.ConnectionParams; +import com.passus.st.emitter.pcap.UnidirectionalTasks.CloseTask; +import com.passus.st.emitter.pcap.UnidirectionalTasks.ConnectTask; +import com.passus.st.emitter.pcap.UnidirectionalTasks.FlushTask; +import com.passus.st.emitter.pcap.UnidirectionalTasks.UnidirectionalTask; +import com.passus.st.metric.MetricSource; +import com.passus.st.metric.MetricsContainer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static com.passus.net.utils.AddressUtils.jdkInetAddresToIpAddress; +import static com.passus.st.emitter.EmitterUtils.getConnectionParams; +import static com.passus.st.emitter.SessionMapper.ANY_SOCKET; + +class UnidirectionalPcapWorker extends Thread implements MetricSource { + + protected final Logger LOGGER = LogManager.getLogger(UnidirectionalPcapWorker.class); + + boolean working = true; + + protected final BlockingQueue<UnidirectionalTask> tasks = new LinkedBlockingQueue<>(); + + private Map<SessionInfo, UnidirectionalPcapChannelContext> sessions = new HashMap<>(); + + private SessionMapper sessionMapper; + + private volatile boolean collectMetrics = false; + + private EmitterMetric metric; + + private PortPool portPool = new PortPoolImpl(); + + private MACAddressResolver macResolver; + + public PortPool getPortPool() { + return portPool; + } + + public void setPortPool(PortPool portPool) { + Assert.notNull(portPool, "portPool"); + this.portPool = portPool; + } + + public MACAddressResolver getMacResolver() { + return macResolver; + } + + public void setMacResolver(MACAddressResolver macResolver) { + Assert.notNull(macResolver, "macResolver"); + this.macResolver = macResolver; + } + + public SessionMapper getSessionMapper() { + return sessionMapper; + } + + public void setSessionMapper(SessionMapper sessionMapper) { + Assert.notNull(sessionMapper, "sessionMapper"); + this.sessionMapper = sessionMapper; + } + + @Override + public boolean isCollectMetrics() { + return collectMetrics; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + this.collectMetrics = collectMetrics; + } + + @Override + public void writeMetrics(MetricsContainer container) { + if (collectMetrics) { + container.update(System.currentTimeMillis(), metric); + metric.reset(); + } + } + + private void doCatchException(ChannelContext channelContext, EmitterHandler handler, Throwable cause) { + EmitterUtils.doCatchException(channelContext, cause, handler, collectMetrics, metric, LOGGER); + } + + private NetworkInterface findFirstUpInterface() throws IOException { + NetworkInterface ni = EmitterUtils.findFirstUpNetworkInterface(); + if (ni == null) { + throw new IOException("Unable to find network interface."); + } + + return ni; + } + + private NetworkInterface findInterface(IpAddress address) throws IOException { + NetworkInterface ni = EmitterUtils.findNetworkInterface(address); + if (ni == null) { + throw new IOException("Unable to find network interface."); + } + + return ni; + } + + boolean containsContext(SessionInfo sessionInfo) { + return sessions.containsKey(sessionInfo); + } + + private UnidirectionalPcapChannelContext findContext(SessionInfo sessionInfo) { + UnidirectionalPcapChannelContext context = sessions.get(sessionInfo); + if (context == null) { + throw new IllegalArgumentException("Unable to find context for session: " + sessionInfo); + } + + return context; + } + + private int nextFreeLocalPort() throws IOException { + int port = portPool.borrow(); + if (port == -1) { + throw new IOException("Bind error. Unable to get local port."); + } + + return port; + } + + void connect(SessionInfo sessionInfo, EmitterHandler handler) { + tasks.add(new ConnectTask(sessionInfo, handler)); + } + + void flush(SessionInfo sessionInfo) { + tasks.add(new FlushTask(sessionInfo)); + } + + void close(SessionInfo sessionInfo) { + tasks.add(new CloseTask(sessionInfo)); + } + + private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) { + ConnectionParams connParams = getConnectionParams(sessionInfo, handler, sessionMapper, collectMetrics, metric, LOGGER); + if (connParams == null) { + return; + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); + } + + UnidirectionalPcapChannelContext channelContext = null; + + String device; + NetworkInterface networkInterface; + SocketAddress localAddress = connParams.getBindAddress(); + MACAddress localMac; + try { + if (localAddress == null || ANY_SOCKET.equals(localAddress)) { + networkInterface = findFirstUpInterface(); + InetAddress inetAddress = networkInterface.getInetAddresses().nextElement(); + int port = nextFreeLocalPort(); + localAddress = new SocketAddress(jdkInetAddresToIpAddress(inetAddress), port); + } else { + networkInterface = findInterface(localAddress.getIp()); + } + + localMac = new MACAddress(networkInterface.getHardwareAddress()); + device = networkInterface.getName(); + } catch (IOException ex) { + doCatchException(channelContext, handler, ex); + return; + } + + SocketAddress remoteAddress = connParams.getRemoteAddress(); + if (remoteAddress == null) { + remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); + } + + MACAddress remoteMac = macResolver.resolve(remoteAddress.getIp()); + channelContext = new UnidirectionalPcapChannelContext(this, handler, sessionInfo, localAddress, remoteAddress, localMac, remoteMac); + + sessions.put(sessionInfo, channelContext); + + try { + handler.channelRegistered(channelContext); + } catch (Exception ex) { + doCatchException(channelContext, handler, ex); + } + + try { + handler.channelActive(channelContext); + } catch (Exception ex) { + doCatchException(channelContext, handler, ex); + } + } + + private void doClose(SessionInfo sessionInfo) { + UnidirectionalPcapChannelContext channelContext = sessions.get(sessionInfo); + if (channelContext == null) { + LOGGER.debug("Unable to find context for session " + sessionInfo + "."); + return; + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Closing session '" + sessionInfo + "'."); + } + + EmitterHandler handler = channelContext.getHandler(); + portPool.release(sessionInfo.getSrcPort()); + try { + handler.channelInactive(channelContext); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + + sessions.remove(sessionInfo); + try { + handler.channelUnregistered(channelContext); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'."); + } + + if (collectMetrics) { + metric.incClosedConnections(); + } + } + + @Override + public void run() { + while (working) { + UnidirectionalTask task = null; + try { + task = tasks.take(); + } catch (InterruptedException ignore) { + + } + + if (task != null) { + switch (task.code) { + case Task.CONNECT: + ConnectTask connectTask = (ConnectTask) task; + doConnect(connectTask.sessionInfo, connectTask.handler); + break; + /* case Task.FLUSH: + doWrite(); + break;*/ + case Task.CLOSE: + CloseTask closeTask = (CloseTask) task; + doClose(closeTask.sessionInfo); + break; + } + } + + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalTasks.java Fri Oct 04 12:40:06 2019 +0200 @@ -0,0 +1,45 @@ +package com.passus.st.emitter.pcap; + +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.emitter.Task; + +public class UnidirectionalTasks { + + protected static class UnidirectionalTask extends Task { + + final SessionInfo sessionInfo; + + public UnidirectionalTask(int code, SessionInfo sessionInfo) { + super(code); + this.sessionInfo = sessionInfo; + } + } + + protected final static class ConnectTask extends UnidirectionalTask { + + final EmitterHandler handler; + + public ConnectTask(SessionInfo sessionInfo, final EmitterHandler handler) { + super(CONNECT, sessionInfo); + this.handler = handler; + } + + } + + protected final static class CloseTask extends UnidirectionalTask { + + public CloseTask(SessionInfo sessionInfo) { + super(CLOSE, sessionInfo); + } + + } + + protected final static class FlushTask extends UnidirectionalTask { + + public FlushTask(SessionInfo sessionInfo) { + super(FLUSH, sessionInfo); + } + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitter.java Fri Oct 04 12:40:06 2019 +0200 @@ -0,0 +1,133 @@ +package com.passus.st.emitter.pcap; + +import com.passus.commons.Assert; +import com.passus.st.emitter.*; +import com.passus.st.metric.MetricsContainer; + +import java.io.IOException; + +public class UnidirectionalUdpPcapEmitter implements Emitter { + + private static final int DEFAULT_WORKERS_NUM = 4; + + private boolean started = false; + + private MACAddressResolver macAddressResolver; + + private UnidirectionalPcapWorker[] workers; + + private int workersNum = DEFAULT_WORKERS_NUM; + + private PortPool portPool = new SynchronizedPortPoolWrapper(new PortPoolImpl()); + + private MACAddressResolver macResolver; + + private SessionMapper sessionMapper = Emitter.DEFAULT_SESSION_MAPPER; + + private boolean collectMetrics = false; + + @Override + public void setSessionMapper(SessionMapper sessionMapper) { + this.sessionMapper = sessionMapper; + } + + @Override + public SessionMapper getSessionMapper() { + return sessionMapper; + } + + public int getWorkersNum() { + return workersNum; + } + + public void setWorkersNum(int workersNum) { + Assert.greaterThanZero(workersNum, "workersNum"); + this.workersNum = workersNum; + } + + public MACAddressResolver getMacAddressResolver() { + return macAddressResolver; + } + + public void setMacAddressResolver(MACAddressResolver macAddressResolver) { + Assert.notNull(macAddressResolver); + this.macAddressResolver = macAddressResolver; + } + + public PortPool getPortPool() { + return portPool; + } + + public void setPortPool(PortPool portPool) { + Assert.notNull(portPool, "portPool"); + this.portPool = new SynchronizedPortPoolWrapper(portPool); + } + + @Override + public boolean isCollectMetrics() { + return collectMetrics; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + this.collectMetrics = collectMetrics; + } + + @Override + public void writeMetrics(MetricsContainer container) { + throw new RuntimeException("Not implemented yet."); + } + + @Override + public boolean isStarted() { + return started; + } + + @Override + public void start() { + if (started) { + return; + } + + workers = new UnidirectionalPcapWorker[workersNum]; + for (int i = 0; i < workersNum; i++) { + UnidirectionalPcapWorker worker = new UnidirectionalPcapWorker(); + worker.setPortPool(portPool); + worker.setMacResolver(macResolver); + worker.start(); + workers[i] = worker; + } + + started = true; + } + + @Override + public void stop() { + if (!started) { + return; + } + + for (int i = 0; i < workersNum; i++) { + UnidirectionalPcapWorker worker = workers[i]; + worker.working = false; + worker.interrupt(); + + try { + worker.join(); + } catch (InterruptedException ignore) { + + } + } + + workers = null; + started = false; + } + + @Override + public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException { + int hashCode = (sessionInfo.hashCode() + workerIndex) & 0x7fffffff; + int index = hashCode % workers.length; + workers[index].connect(sessionInfo, handler); + } + +}
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java Thu Oct 03 14:49:05 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java Fri Oct 04 12:40:06 2019 +0200 @@ -9,8 +9,8 @@ import java.util.LinkedList; import java.util.Queue; -import static com.passus.st.emitter.socket.Connection.CLOSE_TASK; -import static com.passus.st.emitter.socket.Connection.FLUSH_TASK; +import static com.passus.st.emitter.StatelessTasks.CLOSE_TASK; +import static com.passus.st.emitter.StatelessTasks.FLUSH_TASK; public abstract class AbstractChannelContext<K> implements ChannelContext<K> {
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Thu Oct 03 14:49:05 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Fri Oct 04 12:40:06 2019 +0200 @@ -3,10 +3,7 @@ import com.passus.data.ByteBuff; import com.passus.data.HeapByteBuff; import com.passus.net.SocketAddress; -import com.passus.st.emitter.EmitterHandler; -import com.passus.st.emitter.EmitterMetric; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.emitter.SessionMapper; +import com.passus.st.emitter.*; import com.passus.st.emitter.SessionMapper.ConnectionParams; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -16,8 +13,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID; - public abstract class Connection extends Thread { protected final Logger logger = LogManager.getLogger(getClass()); @@ -59,50 +54,11 @@ public abstract boolean isConnectionPending(); protected final ConnectionParams getConnParams(SessionInfo sessionInfo, EmitterHandler handler) { - ConnectionParams connParams = sessionMapper.map(sessionInfo); - if (connParams == null) { - if (logger.isDebugEnabled()) { - logger.debug("Unable to map session '{}'.", sessionInfo); - } - - if (collectMetrics) { - synchronized (metric) { - metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID); - } - } - - try { - handler.sessionInvalidated(sessionInfo); - } catch (Exception e) { - logger.debug(e.getMessage(), e); - } - - return null; - } - - if (logger.isDebugEnabled()) { - logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); - } - - return connParams; + return EmitterUtils.getConnectionParams(sessionInfo, handler, sessionMapper, collectMetrics, metric, logger); } protected void doCatchException(AbstractChannelContext channelContext, Throwable cause) { - if (logger.isDebugEnabled()) { - logger.debug("Error occurred. " + cause.getMessage(), cause); - } - - if (collectMetrics) { - synchronized (metric) { - metric.errorCaught(cause); - } - } - - try { - handler.errorOccurred(channelContext, cause); - } catch (Exception e) { - logger.debug(e.getMessage(), e); - } + EmitterUtils.doCatchException(channelContext, cause, handler, collectMetrics, metric, logger); } public abstract void connect(); @@ -214,24 +170,5 @@ } } - protected static class Task { - - public static final int CONNECT = 1; - public static final int READ = 2; - public static final int FLUSH = 3; - public static final int CLOSE = 4; - - final int code; - - public Task(int code) { - this.code = code; - } - - } - - static final Task CONNECT_TASK = new Task(Task.CONNECT); - static final Task READ_TASK = new Task(Task.READ); - static final Task FLUSH_TASK = new Task(Task.FLUSH); - static final Task CLOSE_TASK = new Task(Task.CLOSE); }
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java Thu Oct 03 14:49:05 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java Fri Oct 04 12:40:06 2019 +0200 @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -63,7 +64,7 @@ } if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); + LOGGER.debug("Registering TCP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); } try { @@ -107,13 +108,19 @@ connectionPending = false; } - @Override - public void close() { + private void closeQuietly(Closeable c) { try { - socket.close(); + c.close(); } catch (Exception ignore) { } + } + + @Override + public void close() { + closeQuietly(socket); + closeQuietly(in); + closeQuietly(out); if (collectMetrics) { metric.incClosedConnections();
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Thu Oct 03 14:49:05 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Fri Oct 04 12:40:06 2019 +0200 @@ -89,7 +89,7 @@ connections.forEach((sessionInfo, conn) -> { if (conn.isConnected()) { try { - conn.tasks.put(Connection.CLOSE_TASK); + conn.tasks.put(StatelessTasks.CLOSE_TASK); } catch (InterruptedException e) { } @@ -155,7 +155,7 @@ throw new IllegalArgumentException("Not supported transport " + sessionInfo.getTransport() + "."); } - connection.tasks.add(Connection.CONNECT_TASK); + connection.tasks.add(StatelessTasks.CONNECT_TASK); connection.start(); connections.put(sessionInfo, connection); }
--- a/stress-tester/src/test/java/com/passus/st/client/TestClientHandler.java Thu Oct 03 14:49:05 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/TestClientHandler.java Fri Oct 04 12:40:06 2019 +0200 @@ -1,15 +1,15 @@ package com.passus.st.client; -import com.passus.st.emitter.ChannelContext; import com.passus.data.ByteBuff; import com.passus.net.SocketAddress; +import com.passus.st.emitter.ChannelContext; import com.passus.st.emitter.EmitterHandler; import com.passus.st.emitter.SessionInfo; + import java.util.LinkedList; import java.util.Queue; /** - * * @author Mirosław Hawrot */ public class TestClientHandler implements EmitterHandler { @@ -25,8 +25,12 @@ USER } + public static final int DEFAULT_TIMEOUT = 5_000; + private final LinkedList<ClientEvent> events = new LinkedList<>(); + private ChannelContext channelContext; + private boolean channelUnregistered = false; public boolean isChannelUnregistered() { @@ -34,48 +38,99 @@ } public void clear() { - channelUnregistered = false; - events.clear(); + synchronized (this) { + channelUnregistered = false; + events.clear(); + channelContext = null; + } + } + + public ChannelContext getChannelContext() { + synchronized (this) { + return channelContext; + } } public Queue<ClientEvent> events() { - return new LinkedList<>(events); + synchronized (this) { + return new LinkedList<>(events); + } + } + + public ClientEvent findFirst(EventType eventType) { + synchronized (this) { + for (ClientEvent event : events) { + if (event.getType() == eventType) { + return event; + } + } + + return null; + } + } + + public boolean contains(EventType eventType) { + synchronized (this) { + for (ClientEvent event : events) { + if (event.getType() == eventType) { + return true; + } + } + + return false; + } } public ClientEvent get(int index) { - return events.get(index); + synchronized (this) { + return events.get(index); + } } public int size() { - return events.size(); + synchronized (this) { + return events.size(); + } } public boolean isEmpty() { - return events.isEmpty(); + synchronized (this) { + return events.isEmpty(); + } } public ClientEvent getFirst() { - return events.getFirst(); + synchronized (this) { + return events.getFirst(); + } } public ClientEvent getLast() { - return events.getLast(); + synchronized (this) { + return events.getLast(); + } } public void add(ClientEvent event) { - events.add(event); + synchronized (this) { + events.add(event); + } } public void add(EventType type, ChannelContext context) { - events.add(ClientEvent.create(type, context)); + synchronized (this) { + events.add(ClientEvent.create(type, context)); + } } @Override public final void errorOccurred(ChannelContext context, Throwable cause) throws Exception { - ClientEvent event = ClientEvent.create(EventType.ERROR_OCCURED, context); - event.setCause(cause); - add(event); - doErrorOccured(context, cause); + synchronized (this) { + ClientEvent event = ClientEvent.create(EventType.ERROR_OCCURED, context); + event.setCause(cause); + add(event); + doErrorOccured(context, cause); + } } protected void doErrorOccured(ChannelContext context, Throwable cause) throws Exception { @@ -84,9 +139,11 @@ @Override public final void dataWritten(ChannelContext context) throws Exception { - ClientEvent event = ClientEvent.create(EventType.DATA_WRITTEN, context); - add(event); - doDataWritten(context); + synchronized (this) { + ClientEvent event = ClientEvent.create(EventType.DATA_WRITTEN, context); + add(event); + doDataWritten(context); + } } protected void doDataWritten(ChannelContext context) throws Exception { @@ -95,10 +152,12 @@ @Override public final void dataReceived(ChannelContext context, ByteBuff data) throws Exception { - ClientEvent event = ClientEvent.create(EventType.DATA_RECEIVED, context); - event.setData(data); - add(event); - doDataReceived(context, data); + synchronized (this) { + ClientEvent event = ClientEvent.create(EventType.DATA_RECEIVED, context); + event.setData(data); + add(event); + doDataReceived(context, data); + } } protected void doDataReceived(ChannelContext context, ByteBuff data) throws Exception { @@ -107,9 +166,12 @@ @Override public final void channelInactive(ChannelContext context) throws Exception { - ClientEvent event = ClientEvent.create(EventType.CHANNEL_INACTIVE, context); - add(event); - doChannelInactive(context); + synchronized (this) { + channelContext = context; + ClientEvent event = ClientEvent.create(EventType.CHANNEL_INACTIVE, context); + add(event); + doChannelInactive(context); + } } protected void doChannelInactive(ChannelContext context) throws Exception { @@ -118,9 +180,11 @@ @Override public final void channelActive(ChannelContext context) throws Exception { - ClientEvent event = ClientEvent.create(EventType.CHANNEL_ACTIVE, context); - add(event); - doChannelActive(context); + synchronized (this) { + ClientEvent event = ClientEvent.create(EventType.CHANNEL_ACTIVE, context); + add(event); + doChannelActive(context); + } } protected void doChannelActive(ChannelContext context) throws Exception { @@ -129,10 +193,12 @@ @Override public final void channelUnregistered(ChannelContext context) throws Exception { - channelUnregistered = true; - ClientEvent event = ClientEvent.create(EventType.CHANNEL_UNREGISTERED, context); - add(event); - doChannelUnregistered(context); + synchronized (this) { + channelUnregistered = true; + ClientEvent event = ClientEvent.create(EventType.CHANNEL_UNREGISTERED, context); + add(event); + doChannelUnregistered(context); + } } protected void doChannelUnregistered(ChannelContext context) throws Exception { @@ -141,15 +207,39 @@ @Override public void channelRegistered(ChannelContext context) throws Exception { - ClientEvent event = ClientEvent.create(EventType.CHANNEL_REGISTERED, context); - add(event); - doChannelRegistered(context); + synchronized (this) { + ClientEvent event = ClientEvent.create(EventType.CHANNEL_REGISTERED, context); + add(event); + doChannelRegistered(context); + } } protected void doChannelRegistered(ChannelContext context) throws Exception { } + public void waitForEvent(EventType type) { + waitForEvent(type, DEFAULT_TIMEOUT); + } + + public void waitForEvent(EventType type, long timeout) { + synchronized (this) { + long start = System.currentTimeMillis(); + while (!contains(type)) { + try { + wait(5); + } catch (InterruptedException ignore) { + + } + + long now = System.currentTimeMillis(); + if (now - start > timeout) { + throw new RuntimeException("Wait timeout exceeded. Unable to find event '" + type + "'."); + } + } + } + } + public static class ClientEvent { private final EventType type; @@ -205,7 +295,12 @@ @Override public String toString() { - return "ClientEvent{" + "type=" + type + ", sessionInfo=" + sessionInfo + ", localAddress=" + localAddress + ", remoteAddress=" + remoteAddress + ", data=" + data + ", cause=" + cause + '}'; + return "ClientEvent{type=" + type + ", " + + "sessionInfo=" + sessionInfo + ", " + + "localAddress=" + localAddress + ", " + + "remoteAddress=" + remoteAddress + ", " + + "data=" + data + ", " + + "cause=" + cause + '}'; } private static ClientEvent create(EventType type, ChannelContext context) {
--- a/stress-tester/src/test/java/com/passus/st/emitter/RuleBasedSessionMapperTest.java Thu Oct 03 14:49:05 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/emitter/RuleBasedSessionMapperTest.java Fri Oct 04 12:40:06 2019 +0200 @@ -125,7 +125,9 @@ RuleBasedSessionMapper mapper = new RuleBasedSessionMapper(); mapper.addRule(rule); List<Integer> localPorts = Arrays.asList(101, 102, 103); - List<SocketAddress> bindAddresses = localPorts.stream().map((lp) -> mapper.map(si(lp)).getBindAddress()).collect(Collectors.toList()); + List<SocketAddress> bindAddresses = localPorts.stream() + .map((lp) -> mapper.map(si(lp)).getBindAddress()) + .collect(Collectors.toList()); assertEquals(expected, bindAddresses); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorkerTest.java Fri Oct 04 12:40:06 2019 +0200 @@ -0,0 +1,111 @@ +package com.passus.st.emitter.pcap; + +import com.passus.net.MACAddress; +import com.passus.st.client.TestClientHandler; +import com.passus.st.client.TestClientHandler.ClientEvent; +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.MapBasedMACAddressResolver; +import com.passus.st.emitter.PortPool; +import com.passus.st.emitter.SessionInfo; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.text.ParseException; +import java.util.Iterator; + +import static com.passus.st.client.TestClientHandler.EventType.*; +import static org.testng.Assert.assertTrue; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; + +public class UnidirectionalPcapWorkerTest { + + + private final MapBasedMACAddressResolver macResolver = new MapBasedMACAddressResolver(); + + private final TestPortPool portPool = new TestPortPool(); + + @BeforeClass + public void beforeClass() throws ParseException { + macResolver.add("1.1.1.1 -> 11:11:11:11:11:11"); + macResolver.add("2.2.2.2 -> 22:22:22:22:22:22"); + } + + private void startWorker(UnidirectionalPcapWorker worker) { + worker.start(); + try { + Thread.sleep(200); + } catch (InterruptedException e) { + + } + } + + private void stopWorker(UnidirectionalPcapWorker worker) { + worker.working = false; + worker.interrupt(); + try { + worker.join(); + } catch (InterruptedException ignore) { + + } + } + + @Test + public void testConnect() throws Exception { + UnidirectionalPcapWorker worker = new UnidirectionalPcapWorker(); + try { + worker.setMacResolver(macResolver); + worker.setPortPool(portPool); + worker.setSessionMapper(Emitter.DEFAULT_SESSION_MAPPER); + startWorker(worker); + + TestClientHandler handler = new TestClientHandler(); + + SessionInfo sessionInfo = SessionInfo.parse("5.5.5.5:1000 -> 1.1.1.1:80"); + worker.connect(sessionInfo, handler); + handler.waitForEvent(CHANNEL_REGISTERED); + assertTrue(portPool.borrowed); + + worker.close(sessionInfo); + handler.waitForEvent(CHANNEL_UNREGISTERED); + + UnidirectionalPcapChannelContext channelContext = (UnidirectionalPcapChannelContext) handler.getChannelContext(); + assertEquals(TestPortPool.PORT, channelContext.getLocalAddress().getPort()); + assertEquals(new MACAddress("11:11:11:11:11:11"), channelContext.getRemoteHardwareAddress()); + + Iterator<ClientEvent> it = handler.events().iterator(); + assertEquals(CHANNEL_REGISTERED, it.next().getType()); + assertEquals(CHANNEL_ACTIVE, it.next().getType()); + assertEquals(CHANNEL_INACTIVE, it.next().getType()); + assertEquals(CHANNEL_UNREGISTERED, it.next().getType()); + assertFalse(portPool.borrowed); + } finally { + stopWorker(worker); + } + } + + + private class TestPortPool implements PortPool { + + public static final int PORT = 100; + + boolean borrowed = false; + + @Override + public int borrow() { + if (borrowed) { + return -1; + } + + borrowed = true; + return PORT; + } + + @Override + public void release(int port) { + if (borrowed) { + borrowed = false; + } + } + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitterTest.java Fri Oct 04 12:40:06 2019 +0200 @@ -0,0 +1,7 @@ +package com.passus.st.emitter.pcap; + +import static org.testng.Assert.*; + +public class UnidirectionalUdpPcapEmitterTest { + +} \ No newline at end of file
--- a/stress-tester/src/test/java/com/passus/st/utils/NioServer.java Thu Oct 03 14:49:05 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/utils/NioServer.java Fri Oct 04 12:40:06 2019 +0200 @@ -46,6 +46,14 @@ this.port = port; } + public ChannelHandler getServerHandler() { + return serverHandler; + } + + public void setServerHandler(ChannelHandler serverHandler) { + this.serverHandler = serverHandler; + } + public int getConnections() { return connectionsCounter.getConnections(); } @@ -99,10 +107,6 @@ } channel = bs.bind(address, port); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Server {}:{} started.", address, port); - } this.started = true; } catch (Exception ex) { LOGGER.error(ex.getMessage(), ex);
--- a/stress-tester/src/test/java/com/passus/st/utils/server/DatagramServerContext.java Thu Oct 03 14:49:05 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/utils/server/DatagramServerContext.java Fri Oct 04 12:40:06 2019 +0200 @@ -1,7 +1,5 @@ package com.passus.st.utils.server; -import com.passus.data.ByteBuff; - import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; @@ -27,10 +25,6 @@ return clientPort; } - public void write(ByteBuff buff) throws IOException { - write(buff.buffer(), buff.startIndex(), buff.length()); - } - public void write(byte[] data, int offset, int length) throws IOException { DatagramPacket packet = new DatagramPacket(data, offset, length, clientAddress, clientPort); serverSocket.send(packet);
--- a/stress-tester/src/test/java/com/passus/st/utils/server/ServerContext.java Thu Oct 03 14:49:05 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/utils/server/ServerContext.java Fri Oct 04 12:40:06 2019 +0200 @@ -6,7 +6,9 @@ public interface ServerContext { - void write(ByteBuff buff) throws IOException; + default void write(ByteBuff buff) throws IOException { + write(buff.buffer(), buff.startIndex(), buff.length()); + } void write(byte[] data, int offset, int length) throws IOException;