changeset 1029:e47bfb487cfd

UnidirectionalUdpPcapEmitterTest.testSend + bugfix
author Devel 1
date Thu, 02 Apr 2020 10:26:17 +0200
parents 056fa28115ff
children 170c8ce25bef
files stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java stress-tester/src/main/java/com/passus/st/utils/EventUtils.java stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitterTest.java
diffstat 3 files changed, 157 insertions(+), 7 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java	Thu Apr 02 08:52:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java	Thu Apr 02 10:26:17 2020 +0200
@@ -210,6 +210,7 @@
             }
             localMac = new MACAddress(localHwAddress);
             device = getPcapDeviceName(networkInterface);
+            LOGGER.debug("Found device " + device + " for ifc " + networkInterface);
         } catch (IOException ex) {
             doCatchException(channelContext, ex);
             return;
@@ -222,7 +223,6 @@
 
         MACAddress remoteMac = macResolver.resolve(remoteAddress.getIp());
         channelContext = new UnidirectionalPcapChannelContext(this, handler, sessionInfo, localAddress, remoteAddress, localMac, remoteMac);
-
         sessions.put(sessionInfo, channelContext);
 
         try {
@@ -266,7 +266,7 @@
             return;
         }
 
-        Queue<ByteBuffer> queue = channelContext.dataQueue;
+        Queue<byte[]> queue = channelContext.dataQueue;
         if (queue.isEmpty()) {
             return;
         }
@@ -280,11 +280,15 @@
         int written = 0;
         try {
             Pcap pcap = channelContext.getPcap();
-            int length = 0;
-            if (pcap.sendPacket(length, channelContext.buffer) == 0) {
-                written = length;
-            } else {
-                throw new IOException("Unable to send packet. Pcap error.");
+
+            while (!queue.isEmpty()) {
+                byte[] buf = queue.poll();
+                int res = pcap.sendPacket(buf.length, buf);
+                if (res == 0) {
+                    written += buf.length;
+                } else {
+                    throw new IOException("Unable to send packet. Pcap error: " + pcap.getErr());
+                }
             }
         } catch (Exception e) {
             doCatchException(channelContext, e);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/utils/EventUtils.java	Thu Apr 02 10:26:17 2020 +0200
@@ -0,0 +1,113 @@
+package com.passus.st.utils;
+
+import com.passus.commons.utils.ArrayUtils;
+import com.passus.commons.utils.ResourceUtils;
+import com.passus.net.PortRangeSet;
+import com.passus.net.dns.session.DnsUdpSessionAnalyzer;
+import com.passus.net.http.session.HttpSessionAnalyzer;
+import com.passus.net.netflow.NetflowUdpSessionAnalyzer;
+import com.passus.net.session.SessionAnalyzer;
+import com.passus.st.Protocols;
+import com.passus.st.client.ArrayListEventHandler;
+import com.passus.st.client.Event;
+import com.passus.st.source.PcapSessionEventSource;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author Mirosław Hawrot
+ */
+public class EventUtils {
+
+    private EventUtils() {
+    }
+
+    public static List<Event> readEvents(String pcapRelativeFile) {
+        return readEvents(pcapRelativeFile, null);
+    }
+
+    public static List<Event> readEvents(String pcapRelativeFile, Map<String, Object> props) {
+        File pcapFile = ResourceUtils.getFile(pcapRelativeFile);
+        return readEvents(pcapFile, props);
+    }
+
+    public static List<Event> readEvents(File pcapFile, Map<String, Object> props) {
+        try {
+            boolean allowPartialSession = false;
+            Set<Integer> protocols = ArrayUtils.asSet(Protocols.HTTP);
+            PortRangeSet portsRange = null;
+            if (props != null) {
+                allowPartialSession = (boolean) props.getOrDefault("allowPartialSession", false);
+
+                Object ports = props.getOrDefault("ports", null);
+                if (ports instanceof PortRangeSet) {
+                    portsRange = (PortRangeSet) ports;
+                } else if (ports instanceof Integer) {
+                    portsRange = new PortRangeSet();
+                    portsRange.add((Integer) ports);
+                }
+
+                protocols = (Set<Integer>) props.getOrDefault("protocols", ArrayUtils.asSet(Protocols.HTTP));
+            }
+
+            PcapSessionEventSource src = new PcapSessionEventSource();
+            src.setAllowPartialSession(allowPartialSession);
+            src.setPcapFile(pcapFile.getAbsolutePath());
+
+            SessionAnalyzer analyzer;
+            if (protocols.contains(Protocols.HTTP)) {
+                analyzer = new HttpSessionAnalyzer();
+                if (portsRange != null) {
+                    analyzer.getPortsRange().addAll(portsRange);
+                }
+                src.addAnalyzer(analyzer);
+            }
+
+            if (protocols.contains(Protocols.DNS)) {
+                analyzer = new DnsUdpSessionAnalyzer();
+                if (portsRange != null) {
+                    analyzer.getPortsRange().addAll(portsRange);
+                }
+                src.addAnalyzer(analyzer);
+            }
+
+            if (protocols.contains(Protocols.NETFLOW)) {
+                analyzer = new NetflowUdpSessionAnalyzer();
+                if (portsRange != null) {
+                    analyzer.getPortsRange().addAll(portsRange);
+                }
+                src.addAnalyzer(analyzer);
+            }
+
+            src.setLoops(1);
+
+            ArrayListEventHandler handler = new ArrayListEventHandler();
+            src.setHandler(handler);
+
+            src.start();
+            waitForSource(src);
+            src.stop();
+
+            return handler.getEvents();
+        } catch (Exception e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    public static void waitForSource(PcapSessionEventSource src) {
+        try {
+            Thread.sleep(200);
+        } catch (Exception e) {
+        }
+
+        while (src.isWorking()) {
+            try {
+                Thread.sleep(100);
+            } catch (Exception e) {
+            }
+        }
+    }
+}
--- a/stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitterTest.java	Thu Apr 02 08:52:28 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitterTest.java	Thu Apr 02 10:26:17 2020 +0200
@@ -1,5 +1,6 @@
 package com.passus.st.emitter.pcap;
 
+import com.passus.data.HeapByteBuff;
 import com.passus.net.session.Session;
 import com.passus.st.AbstractWireMockTest;
 import com.passus.st.client.TestClientHandler;
@@ -54,4 +55,36 @@
         }
     }
 
+    @Test
+    public void testSend() throws Exception {
+        final byte[] ETH = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0};
+
+        UnidirectionalUdpPcapEmitter emitter = createEmitter(null);
+        emitter.start();
+        try {
+            SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, port(), Session.PROTOCOL_UDP);
+
+            TestClientHandler handler = new TestClientHandler() {
+
+                @Override
+                protected void doChannelActive(ChannelContext context) throws Exception {
+                    context.writeAndFlush(new HeapByteBuff(ETH));
+                    context.close();
+                }
+            };
+
+            emitter.connect(info, handler, 0);
+            AbstractEmitterTest.waitConn(handler, 5, 5_000);
+
+            AssertJUnit.assertEquals(5, handler.size());
+            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_REGISTERED, handler.get(0).getType());
+            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_ACTIVE, handler.get(1).getType());
+            AssertJUnit.assertEquals(TestClientHandler.EventType.DATA_WRITTEN, handler.get(2).getType());
+            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_INACTIVE, handler.get(3).getType());
+            AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_UNREGISTERED, handler.get(4).getType());
+        } finally {
+            emitter.stop();
+        }
+    }
+
 }