Mercurial > stress-tester
changeset 1005:c91b2c0ca234
UnidirectionalUdpPcapEmitter in progress
author | Devel 2 |
---|---|
date | Wed, 18 Mar 2020 12:27:55 +0100 |
parents | 7ace98380747 |
children | dadf915aa1f6 |
files | stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitter.java stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorkerTest.java |
diffstat | 4 files changed, 164 insertions(+), 16 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java Wed Mar 18 12:27:23 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java Wed Mar 18 12:27:55 2020 +0100 @@ -3,11 +3,10 @@ import com.passus.data.ByteBuff; import com.passus.net.MACAddress; import com.passus.net.SocketAddress; +import com.passus.pcap.Pcap; import com.passus.st.emitter.ChannelContext; import com.passus.st.emitter.EmitterHandler; import com.passus.st.emitter.SessionInfo; -import com.passus.st.emitter.pcap.UnidirectionalTasks.CloseTask; -import com.passus.st.emitter.pcap.UnidirectionalTasks.FlushTask; import java.io.IOException; import java.util.LinkedList; @@ -15,9 +14,15 @@ public class UnidirectionalPcapChannelContext<K> implements ChannelContext<K> { + private static final int DEFAULT_BUFFER_SIZE = 65 * 1024; + private final UnidirectionalPcapWorker worker; - private final Queue<byte[]> dataQueue; + private Pcap pcap; + + final Queue<byte[]> dataQueue; + + final byte[] buffer; private K attachment; @@ -33,9 +38,10 @@ private final MACAddress remoteHardwareAddress; + private String device; + public UnidirectionalPcapChannelContext(UnidirectionalPcapWorker worker, - EmitterHandler handler, - SessionInfo sessionInfo, + EmitterHandler handler, SessionInfo sessionInfo, SocketAddress localAddress, SocketAddress remoteAddress, MACAddress localHardwareAddress, MACAddress remoteHardwareAddress) { this.worker = worker; @@ -46,6 +52,23 @@ this.remoteAddress = remoteAddress; this.localHardwareAddress = localHardwareAddress; this.remoteHardwareAddress = remoteHardwareAddress; + this.buffer = new byte[DEFAULT_BUFFER_SIZE]; + } + + Pcap getPcap() { + return pcap; + } + + void setPcap(Pcap pcap) { + this.pcap = pcap; + } + + String getDevice() { + return device; + } + + void setDevice(String device) { + this.device = device; } @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java Wed Mar 18 12:27:23 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java Wed Mar 18 12:27:55 2020 +0100 @@ -4,6 +4,8 @@ import com.passus.net.IpAddress; import com.passus.net.MACAddress; import com.passus.net.SocketAddress; +import com.passus.pcap.Pcap; +import com.passus.pcap.PcapIfc; import com.passus.st.emitter.*; import com.passus.st.emitter.SessionMapper.ConnectionParams; import com.passus.st.emitter.pcap.UnidirectionalTasks.CloseTask; @@ -12,14 +14,19 @@ import com.passus.st.emitter.pcap.UnidirectionalTasks.UnidirectionalTask; import com.passus.st.metric.MetricSource; import com.passus.st.metric.MetricsContainer; +import org.apache.commons.lang3.SystemUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; import java.net.InetAddress; import java.net.NetworkInterface; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -47,6 +54,8 @@ private MACAddressResolver macResolver; + private final Map<String, PcapInstance> pcaps = new HashMap<>(); + public PortPool getPortPool() { return portPool; } @@ -92,7 +101,8 @@ } } - private void doCatchException(ChannelContext channelContext, EmitterHandler handler, Throwable cause) { + private void doCatchException(UnidirectionalPcapChannelContext channelContext, Throwable cause) { + EmitterHandler handler = channelContext == null ? null : channelContext.getHandler(); EmitterUtils.doCatchException(channelContext, cause, handler, collectMetrics, metric, LOGGER); } @@ -148,6 +158,24 @@ tasks.add(new CloseTask(sessionInfo)); } + private String getPcapDeviceName(NetworkInterface ifc) throws SocketException { + //Windows workaround + if (SystemUtils.IS_OS_WINDOWS) { + byte[] ifcMac = ifc.getHardwareAddress(); + StringBuilder sb = new StringBuilder(); + PcapIfc[] pcapIfcs = Pcap.listAvailableDevices(sb); + for (int i = 0; i < pcapIfcs.length; i++) { + PcapIfc pcapIfc = pcapIfcs[i]; + byte[] pcapIfcMac = Pcap.getHardwareAddress(pcapIfc.name); + if (Arrays.equals(ifcMac, pcapIfcMac)) { + return pcapIfc.name; + } + } + } + + return ifc.getName(); + } + private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) { ConnectionParams connParams = getConnectionParams(sessionInfo, handler, sessionMapper, collectMetrics, metric, LOGGER); if (connParams == null) { @@ -175,9 +203,9 @@ } localMac = new MACAddress(networkInterface.getHardwareAddress()); - device = networkInterface.getName(); + device = getPcapDeviceName(networkInterface); } catch (IOException ex) { - doCatchException(channelContext, handler, ex); + doCatchException(channelContext, ex); return; } @@ -194,29 +222,111 @@ try { handler.channelRegistered(channelContext); } catch (Exception ex) { - doCatchException(channelContext, handler, ex); + doCatchException(channelContext, ex); } try { + PcapInstance pcapInstance = pcaps.get(device); + if (pcapInstance == null) { + StringBuilder sb = new StringBuilder(); + Pcap pcap = Pcap.openLive(device, sb); + if (pcap == null) { + doClose(sessionInfo); + throw new IOException("Unable to open pcap on device '" + device + ".' " + sb); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Pcap instance for device {} created.", device); + } + + pcapInstance = new PcapInstance(pcap); + pcaps.put(device, pcapInstance); + } + + channelContext.setPcap(pcapInstance.pcap); + channelContext.setDevice(device); + pcapInstance.borrows++; + handler.channelActive(channelContext); } catch (Exception ex) { - doCatchException(channelContext, handler, ex); + doCatchException(channelContext, ex); + } + } + + private void doWrite(SessionInfo sessionInfo) { + UnidirectionalPcapChannelContext channelContext = sessions.get(sessionInfo); + if (channelContext == null) { + LOGGER.debug("Unable to find context for session {}.", sessionInfo); + return; + } + + Queue<ByteBuffer> queue = channelContext.dataQueue; + if (queue.isEmpty()) { + return; + } + + EmitterHandler handler = channelContext.getHandler(); + handler.dataWriteStart(channelContext); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Writing ({} -> {}).", channelContext.getLocalAddress(), channelContext.getRemoteAddress()); + } + + int written = 0; + try { + Pcap pcap = channelContext.getPcap(); + int length = 0; + if (pcap.sendPacket(length, channelContext.buffer) == 0) { + written = length; + } else { + throw new IOException("Unable to send packet. Pcap error."); + } + } catch (Exception e) { + doCatchException(channelContext, e); + doClose(channelContext); + return; + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Written {}B ({} -> {})", written, channelContext.getLocalAddress(), channelContext.getRemoteAddress()); + } + + try { + handler.dataWritten(channelContext); + LOGGER.debug("Write handled."); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); } } private void doClose(SessionInfo sessionInfo) { UnidirectionalPcapChannelContext channelContext = sessions.get(sessionInfo); if (channelContext == null) { - LOGGER.debug("Unable to find context for session " + sessionInfo + "."); + LOGGER.debug("Unable to find context for session {}.", sessionInfo); return; } + doClose(channelContext); + } + + private void doClose(UnidirectionalPcapChannelContext channelContext) { + SessionInfo sessionInfo = channelContext.getSessionInfo(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Closing session '" + sessionInfo + "'."); } EmitterHandler handler = channelContext.getHandler(); portPool.release(sessionInfo.getSrcPort()); + + String device = channelContext.getDevice(); + PcapInstance pcapInstance = pcaps.get(device); + if (pcapInstance != null) { + pcapInstance.borrows--; + if (pcapInstance.borrows == 0) { + pcapInstance.pcap.close(); + pcaps.remove(device); + } + } + try { handler.channelInactive(channelContext); } catch (Exception e) { @@ -255,9 +365,10 @@ ConnectTask connectTask = (ConnectTask) task; doConnect(connectTask.sessionInfo, connectTask.handler); break; - /* case Task.FLUSH: - doWrite(); - break;*/ + case Task.FLUSH: + FlushTask flushTask = (FlushTask) task; + doWrite(flushTask.sessionInfo); + break; case Task.CLOSE: CloseTask closeTask = (CloseTask) task; doClose(closeTask.sessionInfo); @@ -267,4 +378,15 @@ } } + + private static class PcapInstance { + + private final Pcap pcap; + + private int borrows; + + public PcapInstance(Pcap pcap) { + this.pcap = pcap; + } + } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitter.java Wed Mar 18 12:27:23 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitter.java Wed Mar 18 12:27:55 2020 +0100 @@ -1,6 +1,7 @@ package com.passus.st.emitter.pcap; import com.passus.commons.Assert; +import com.passus.net.session.Session; import com.passus.st.emitter.*; import com.passus.st.metric.MetricsContainer; @@ -125,6 +126,10 @@ @Override public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException { + if (sessionInfo.getTransport() != Session.PROTOCOL_UDP) { + throw new IllegalArgumentException("Only UDP transport supported."); + } + int hashCode = (sessionInfo.hashCode() + workerIndex) & 0x7fffffff; int index = hashCode % workers.length; workers[index].connect(sessionInfo, handler);
--- a/stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorkerTest.java Wed Mar 18 12:27:23 2020 +0100 +++ b/stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorkerTest.java Wed Mar 18 12:27:55 2020 +0100 @@ -20,7 +20,6 @@ public class UnidirectionalPcapWorkerTest { - private final MapBasedMACAddressResolver macResolver = new MapBasedMACAddressResolver(); private final TestPortPool portPool = new TestPortPool(); @@ -84,7 +83,6 @@ } } - private class TestPortPool implements PortPool { public static final int PORT = 100;