changeset 1005:c91b2c0ca234

UnidirectionalUdpPcapEmitter in progress
author Devel 2
date Wed, 18 Mar 2020 12:27:55 +0100
parents 7ace98380747
children dadf915aa1f6
files stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitter.java stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorkerTest.java
diffstat 4 files changed, 164 insertions(+), 16 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java	Wed Mar 18 12:27:23 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java	Wed Mar 18 12:27:55 2020 +0100
@@ -3,11 +3,10 @@
 import com.passus.data.ByteBuff;
 import com.passus.net.MACAddress;
 import com.passus.net.SocketAddress;
+import com.passus.pcap.Pcap;
 import com.passus.st.emitter.ChannelContext;
 import com.passus.st.emitter.EmitterHandler;
 import com.passus.st.emitter.SessionInfo;
-import com.passus.st.emitter.pcap.UnidirectionalTasks.CloseTask;
-import com.passus.st.emitter.pcap.UnidirectionalTasks.FlushTask;
 
 import java.io.IOException;
 import java.util.LinkedList;
@@ -15,9 +14,15 @@
 
 public class UnidirectionalPcapChannelContext<K> implements ChannelContext<K> {
 
+    private static final int DEFAULT_BUFFER_SIZE = 65 * 1024;
+
     private final UnidirectionalPcapWorker worker;
 
-    private final Queue<byte[]> dataQueue;
+    private Pcap pcap;
+
+    final Queue<byte[]> dataQueue;
+
+    final byte[] buffer;
 
     private K attachment;
 
@@ -33,9 +38,10 @@
 
     private final MACAddress remoteHardwareAddress;
 
+    private String device;
+
     public UnidirectionalPcapChannelContext(UnidirectionalPcapWorker worker,
-                                            EmitterHandler handler,
-                                            SessionInfo sessionInfo,
+                                            EmitterHandler handler, SessionInfo sessionInfo,
                                             SocketAddress localAddress, SocketAddress remoteAddress,
                                             MACAddress localHardwareAddress, MACAddress remoteHardwareAddress) {
         this.worker = worker;
@@ -46,6 +52,23 @@
         this.remoteAddress = remoteAddress;
         this.localHardwareAddress = localHardwareAddress;
         this.remoteHardwareAddress = remoteHardwareAddress;
+        this.buffer = new byte[DEFAULT_BUFFER_SIZE];
+    }
+
+    Pcap getPcap() {
+        return pcap;
+    }
+
+    void setPcap(Pcap pcap) {
+        this.pcap = pcap;
+    }
+
+    String getDevice() {
+        return device;
+    }
+
+    void setDevice(String device) {
+        this.device = device;
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java	Wed Mar 18 12:27:23 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java	Wed Mar 18 12:27:55 2020 +0100
@@ -4,6 +4,8 @@
 import com.passus.net.IpAddress;
 import com.passus.net.MACAddress;
 import com.passus.net.SocketAddress;
+import com.passus.pcap.Pcap;
+import com.passus.pcap.PcapIfc;
 import com.passus.st.emitter.*;
 import com.passus.st.emitter.SessionMapper.ConnectionParams;
 import com.passus.st.emitter.pcap.UnidirectionalTasks.CloseTask;
@@ -12,14 +14,19 @@
 import com.passus.st.emitter.pcap.UnidirectionalTasks.UnidirectionalTask;
 import com.passus.st.metric.MetricSource;
 import com.passus.st.metric.MetricsContainer;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -47,6 +54,8 @@
 
     private MACAddressResolver macResolver;
 
+    private final Map<String, PcapInstance> pcaps = new HashMap<>();
+
     public PortPool getPortPool() {
         return portPool;
     }
@@ -92,7 +101,8 @@
         }
     }
 
-    private void doCatchException(ChannelContext channelContext, EmitterHandler handler, Throwable cause) {
+    private void doCatchException(UnidirectionalPcapChannelContext channelContext, Throwable cause) {
+        EmitterHandler handler = channelContext == null ? null : channelContext.getHandler();
         EmitterUtils.doCatchException(channelContext, cause, handler, collectMetrics, metric, LOGGER);
     }
 
@@ -148,6 +158,24 @@
         tasks.add(new CloseTask(sessionInfo));
     }
 
+    private String getPcapDeviceName(NetworkInterface ifc) throws SocketException {
+        //Windows workaround
+        if (SystemUtils.IS_OS_WINDOWS) {
+            byte[] ifcMac = ifc.getHardwareAddress();
+            StringBuilder sb = new StringBuilder();
+            PcapIfc[] pcapIfcs = Pcap.listAvailableDevices(sb);
+            for (int i = 0; i < pcapIfcs.length; i++) {
+                PcapIfc pcapIfc = pcapIfcs[i];
+                byte[] pcapIfcMac = Pcap.getHardwareAddress(pcapIfc.name);
+                if (Arrays.equals(ifcMac, pcapIfcMac)) {
+                    return pcapIfc.name;
+                }
+            }
+        }
+
+        return ifc.getName();
+    }
+
     private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) {
         ConnectionParams connParams = getConnectionParams(sessionInfo, handler, sessionMapper, collectMetrics, metric, LOGGER);
         if (connParams == null) {
@@ -175,9 +203,9 @@
             }
 
             localMac = new MACAddress(networkInterface.getHardwareAddress());
-            device = networkInterface.getName();
+            device = getPcapDeviceName(networkInterface);
         } catch (IOException ex) {
-            doCatchException(channelContext, handler, ex);
+            doCatchException(channelContext, ex);
             return;
         }
 
@@ -194,29 +222,111 @@
         try {
             handler.channelRegistered(channelContext);
         } catch (Exception ex) {
-            doCatchException(channelContext, handler, ex);
+            doCatchException(channelContext, ex);
         }
 
         try {
+            PcapInstance pcapInstance = pcaps.get(device);
+            if (pcapInstance == null) {
+                StringBuilder sb = new StringBuilder();
+                Pcap pcap = Pcap.openLive(device, sb);
+                if (pcap == null) {
+                    doClose(sessionInfo);
+                    throw new IOException("Unable to open pcap on device '" + device + ".' " + sb);
+                }
+
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("Pcap instance for device {} created.", device);
+                }
+
+                pcapInstance = new PcapInstance(pcap);
+                pcaps.put(device, pcapInstance);
+            }
+
+            channelContext.setPcap(pcapInstance.pcap);
+            channelContext.setDevice(device);
+            pcapInstance.borrows++;
+
             handler.channelActive(channelContext);
         } catch (Exception ex) {
-            doCatchException(channelContext, handler, ex);
+            doCatchException(channelContext, ex);
+        }
+    }
+
+    private void doWrite(SessionInfo sessionInfo) {
+        UnidirectionalPcapChannelContext channelContext = sessions.get(sessionInfo);
+        if (channelContext == null) {
+            LOGGER.debug("Unable to find context for session {}.", sessionInfo);
+            return;
+        }
+
+        Queue<ByteBuffer> queue = channelContext.dataQueue;
+        if (queue.isEmpty()) {
+            return;
+        }
+
+        EmitterHandler handler = channelContext.getHandler();
+        handler.dataWriteStart(channelContext);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Writing ({} -> {}).", channelContext.getLocalAddress(), channelContext.getRemoteAddress());
+        }
+
+        int written = 0;
+        try {
+            Pcap pcap = channelContext.getPcap();
+            int length = 0;
+            if (pcap.sendPacket(length, channelContext.buffer) == 0) {
+                written = length;
+            } else {
+                throw new IOException("Unable to send packet. Pcap error.");
+            }
+        } catch (Exception e) {
+            doCatchException(channelContext, e);
+            doClose(channelContext);
+            return;
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Written {}B ({} -> {})", written, channelContext.getLocalAddress(), channelContext.getRemoteAddress());
+        }
+
+        try {
+            handler.dataWritten(channelContext);
+            LOGGER.debug("Write handled.");
+        } catch (Exception e) {
+            LOGGER.debug(e.getMessage(), e);
         }
     }
 
     private void doClose(SessionInfo sessionInfo) {
         UnidirectionalPcapChannelContext channelContext = sessions.get(sessionInfo);
         if (channelContext == null) {
-            LOGGER.debug("Unable to find context for session " + sessionInfo + ".");
+            LOGGER.debug("Unable to find context for session {}.", sessionInfo);
             return;
         }
 
+        doClose(channelContext);
+    }
+
+    private void doClose(UnidirectionalPcapChannelContext channelContext) {
+        SessionInfo sessionInfo = channelContext.getSessionInfo();
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Closing session '" + sessionInfo + "'.");
         }
 
         EmitterHandler handler = channelContext.getHandler();
         portPool.release(sessionInfo.getSrcPort());
+
+        String device = channelContext.getDevice();
+        PcapInstance pcapInstance = pcaps.get(device);
+        if (pcapInstance != null) {
+            pcapInstance.borrows--;
+            if (pcapInstance.borrows == 0) {
+                pcapInstance.pcap.close();
+                pcaps.remove(device);
+            }
+        }
+
         try {
             handler.channelInactive(channelContext);
         } catch (Exception e) {
@@ -255,9 +365,10 @@
                         ConnectTask connectTask = (ConnectTask) task;
                         doConnect(connectTask.sessionInfo, connectTask.handler);
                         break;
-     /*               case Task.FLUSH:
-                        doWrite();
-                        break;*/
+                    case Task.FLUSH:
+                        FlushTask flushTask = (FlushTask) task;
+                        doWrite(flushTask.sessionInfo);
+                        break;
                     case Task.CLOSE:
                         CloseTask closeTask = (CloseTask) task;
                         doClose(closeTask.sessionInfo);
@@ -267,4 +378,15 @@
 
         }
     }
+
+    private static class PcapInstance {
+
+        private final Pcap pcap;
+
+        private int borrows;
+
+        public PcapInstance(Pcap pcap) {
+            this.pcap = pcap;
+        }
+    }
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitter.java	Wed Mar 18 12:27:23 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitter.java	Wed Mar 18 12:27:55 2020 +0100
@@ -1,6 +1,7 @@
 package com.passus.st.emitter.pcap;
 
 import com.passus.commons.Assert;
+import com.passus.net.session.Session;
 import com.passus.st.emitter.*;
 import com.passus.st.metric.MetricsContainer;
 
@@ -125,6 +126,10 @@
 
     @Override
     public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException {
+        if (sessionInfo.getTransport() != Session.PROTOCOL_UDP) {
+            throw new IllegalArgumentException("Only UDP transport supported.");
+        }
+
         int hashCode = (sessionInfo.hashCode() + workerIndex) & 0x7fffffff;
         int index = hashCode % workers.length;
         workers[index].connect(sessionInfo, handler);
--- a/stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorkerTest.java	Wed Mar 18 12:27:23 2020 +0100
+++ b/stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorkerTest.java	Wed Mar 18 12:27:55 2020 +0100
@@ -20,7 +20,6 @@
 
 public class UnidirectionalPcapWorkerTest {
 
-
     private final MapBasedMACAddressResolver macResolver = new MapBasedMACAddressResolver();
 
     private final TestPortPool portPool = new TestPortPool();
@@ -84,7 +83,6 @@
         }
     }
 
-
     private class TestPortPool implements PortPool {
 
         public static final int PORT = 100;