changeset 1297:2090eab05b58

Netflow clients moved from passus-net-ex
author Devel 1
date Tue, 11 Aug 2020 10:27:28 +0200
parents dcb1f063650c
children 90ce0004c15f
files stress-tester/src/main/java/com/passus/st/DpdkBatch.java stress-tester/src/main/java/com/passus/st/FrameUtils.java stress-tester/src/main/java/com/passus/st/NetflowClientBase.java stress-tester/src/main/java/com/passus/st/NetflowDatagramClient.java stress-tester/src/main/java/com/passus/st/NetflowDpdkClient.java stress-tester/src/main/java/com/passus/st/NetflowPcapClient.java
diffstat 6 files changed, 519 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/DpdkBatch.java	Tue Aug 11 10:27:28 2020 +0200
@@ -0,0 +1,201 @@
+package com.passus.st;
+
+import com.passus.dpdk.DpdkAO;
+import java.util.List;
+
+import static com.passus.st.NetflowClientBase.MEGA;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+abstract class DpdkBatch {
+
+    final int numBatches;
+    final int numFrames;
+    final long numBytes;
+
+    public DpdkBatch(int batchSize, List<byte[]> frames) {
+        this.numBatches = (frames.size() + batchSize - 1) / batchSize;
+        this.numFrames = frames.size();
+        this.numBytes = sumLengths(frames);
+    }
+
+    private static long sumLengths(List<byte[]> frames) {
+        long lengths = 0;
+        for (byte[] frame : frames) {
+            lengths += frame.length;
+        }
+        return lengths;
+    }
+
+    public void run(int loops) {
+        long t0 = System.currentTimeMillis();
+        for (int l = 0; l < loops; l++) {
+            runLoop();
+        }
+        t0 = System.currentTimeMillis() - t0;
+        float mbps = ((numBytes / MEGA) * loops) / (t0 / 8000.f);
+        float fps = (numFrames * loops) / (t0 / 1000.f);
+        System.out.format("CLIENT sent %d loops of %d frames / %d bytes\n", loops, numFrames, numBytes);
+        System.out.format("total %d bytes in %d ms (%f Mbit/s)\n", numBytes * loops, t0, mbps);
+        System.out.format("total %d frames in %d ms (%f f/s)\n", numFrames * loops, t0, fps);
+        DpdkAO.printStats();
+    }
+
+    public void runVerbose(int loops) {
+        for (int l = 0; l < loops; l++) {
+            long t0 = System.currentTimeMillis();
+            int sent = runLoop();
+            t0 = System.currentTimeMillis() - t0;
+            System.out.format("CLIENT loop %d sent %d of %d in %d ms thread %s\n",
+                    l, sent, numFrames, t0, Thread.currentThread().getName());
+            DpdkAO.printStats();
+        }
+    }
+
+    public int runLoop() {
+        int sent = 0;
+        for (int bidx = 0; bidx < numBatches; bidx++) {
+            sent += runBatch(bidx);
+        }
+        return sent;
+    }
+
+    public abstract int runBatch(int bidx);
+
+    static DpdkBatch resolve(int batchSize, List<byte[]> frames, int batchType) {
+        if (batchSize <= 0) {
+            System.out.println("CLIENT: using sendPacket");
+            return new NoBatch(batchSize, frames);
+        }
+        switch (batchType) {
+            case 0:
+                System.out.println("CLIENT: using sendPackets(byte[], int[])");
+                return new Batch0(batchSize, frames);
+            case 1:
+                System.out.println("CLIENT: using sendPackets(byte[][])");
+                return new Batch1(batchSize, frames);
+            case 2:
+                System.out.println("CLIENT: using sendPackets(byte[][] int[] int)");
+                return new Batch2(batchSize, frames);
+            default:
+                throw new IllegalArgumentException();
+        }
+
+    }
+
+    static class NoBatch extends DpdkBatch {
+
+        final List<byte[]> frames;
+
+        public NoBatch(int batchSize, List<byte[]> frames) {
+            super(batchSize, frames);
+            this.frames = frames;
+        }
+
+        @Override
+        public int runLoop() {
+            int sent = 0;
+            for (byte[] frame : frames) {
+                sent += DpdkAO.sendPacket(frame.length, frame);
+            }
+            return sent;
+        }
+
+        @Override
+        public int runBatch(int bidx) {
+            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+        }
+    }
+
+    static class Batch0 extends DpdkBatch {
+
+        final byte[][] batches;
+        final int[][] lengths;
+
+        public Batch0(int batchSize, List<byte[]> frames) {
+            super(batchSize, frames);
+            batches = new byte[numBatches][];
+            lengths = new int[numBatches][];
+
+            for (int bidx = 0; bidx < numBatches; bidx++) {
+                final int thisBatchBegin = batchSize * bidx;
+                final int thisBatchEnd = Math.min(thisBatchBegin + batchSize, frames.size());
+
+                int thisBatchBytes = 0;
+                for (int fidx = thisBatchBegin; fidx < thisBatchEnd; fidx++) {
+                    thisBatchBytes += frames.get(fidx).length;
+                }
+                batches[bidx] = new byte[thisBatchBytes];
+                lengths[bidx] = new int[thisBatchEnd - thisBatchBegin];
+                int octIdx = 0;
+                int offIdx = 0;
+                for (int fidx = thisBatchBegin; fidx < thisBatchEnd; fidx++) {
+                    int flen = frames.get(fidx).length;
+                    lengths[bidx][offIdx++] = flen;
+                    System.arraycopy(frames.get(fidx), 0, batches[bidx], octIdx, flen);
+                    octIdx += flen;
+                }
+            }
+        }
+
+        @Override
+        public int runBatch(int bidx) {
+            return DpdkAO.sendPackets(batches[bidx], lengths[bidx]);
+        }
+    }
+
+    static class Batch1 extends DpdkBatch {
+
+        final byte[][][] batches;
+
+        public Batch1(int batchSize, List<byte[]> frames) {
+            super(batchSize, frames);
+            batches = new byte[numBatches][][];
+            for (int bidx = 0; bidx < numBatches; bidx++) {
+                final int thisBatchBegin = batchSize * bidx;
+                final int thisBatchEnd = Math.min(thisBatchBegin + batchSize, frames.size());
+                batches[bidx] = new byte[thisBatchEnd - thisBatchBegin][];
+                for (int fidx = thisBatchBegin, ii = 0; fidx < thisBatchEnd; fidx++, ii++) {
+                    batches[bidx][ii] = frames.get(fidx);
+                }
+            }
+        }
+
+        @Override
+        public int runBatch(int bidx) {
+            return DpdkAO.sendPackets(batches[bidx]);
+        }
+    }
+
+    static class Batch2 extends DpdkBatch {
+
+        final byte[][][] batches;
+        final int[][] lengths;
+        final int[] npkts;
+
+        public Batch2(int batchSize, List<byte[]> frames) {
+            super(batchSize, frames);
+            batches = new byte[numBatches][][];
+            lengths = new int[numBatches][];
+            npkts = new int[numBatches];
+            for (int bidx = 0; bidx < numBatches; bidx++) {
+                final int thisBatchBegin = batchSize * bidx;
+                final int thisBatchEnd = Math.min(thisBatchBegin + batchSize, frames.size());
+                batches[bidx] = new byte[batchSize][];
+                lengths[bidx] = new int[thisBatchEnd - thisBatchBegin];
+                npkts[bidx] = thisBatchEnd - thisBatchBegin;
+                for (int fidx = thisBatchBegin, ii = 0; fidx < thisBatchEnd; fidx++, ii++) {
+                    batches[bidx][ii] = frames.get(fidx);
+                    lengths[bidx][ii] = frames.get(fidx).length;
+                }
+            }
+        }
+
+        @Override
+        public int runBatch(int bidx) {
+            return DpdkAO.sendPackets0(batches[bidx], lengths[bidx], npkts[bidx]);
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/FrameUtils.java	Tue Aug 11 10:27:28 2020 +0200
@@ -0,0 +1,58 @@
+package com.passus.st;
+
+import com.passus.data.DataUtils;
+import com.passus.net.packet.Ethernet;
+
+import static com.passus.net.FrameUtils.*;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+class FrameUtils {
+
+    static final int IP_OFFSET = Ethernet.HEADER_LENGTH;
+    static final int UDP_LENGTH = 8;
+
+    static void processFrame(byte[] frameBytes, byte[] srcMac, byte[] dstMac, byte[] srcIp, byte[] dstIp, int srcPort, int dstPort) {
+        final int ip4ChkOff = IP_OFFSET + 10;
+        final int ipLength = (frameBytes[IP_OFFSET] & 0x0f) << 2;
+        final int udpChkOff = IP_OFFSET + ipLength + 6;
+
+        rewriteMacs(frameBytes, srcMac, dstMac);
+        rewriteIps(frameBytes, srcIp, dstIp);
+        rewriteUdpPorts(frameBytes, 51340, 2055, ipLength);
+        int ip4Chk = calcIp4Checksum(frameBytes, IP_OFFSET);
+        int udpChk = calcUdpChecksum(frameBytes, IP_OFFSET, ipLength);
+        DataUtils.writeInt2(frameBytes, ip4ChkOff, ip4Chk);
+        DataUtils.writeInt2(frameBytes, udpChkOff, udpChk);
+    }
+
+    static void rewriteUdpPorts(byte[] frame, int srcPort, int dstPort, int ipLength) {
+        if (srcPort >= 0) {
+            DataUtils.writeInt2(frame, IP_OFFSET + ipLength, srcPort);
+        }
+        if (dstPort >= 0) {
+            DataUtils.writeInt2(frame, IP_OFFSET + ipLength + 2, dstPort);
+        }
+    }
+
+    static void rewriteIps(byte[] frame, byte[] srcIp, byte[] dstIp) {
+        if (srcIp != null) {
+            System.arraycopy(srcIp, 0, frame, IP_OFFSET + 12, 4);
+        }
+        if (dstIp != null) {
+            System.arraycopy(dstIp, 0, frame, IP_OFFSET + 16, 4);
+        }
+    }
+
+    static void rewriteMacs(byte[] frame, byte[] srcMac, byte[] dstMac) {
+        if (dstMac != null) {
+            System.arraycopy(dstMac, 0, frame, 0, 6);
+        }
+        if (srcMac != null) {
+            System.arraycopy(srcMac, 0, frame, 6, 6);
+        }
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/NetflowClientBase.java	Tue Aug 11 10:27:28 2020 +0200
@@ -0,0 +1,38 @@
+package com.passus.st;
+
+import com.passus.net.packet.Udp;
+import com.passus.net.source.pcap.PcapUtils;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class NetflowClientBase {
+
+    protected static final float MEGA = 1 << 20;
+
+    static ArrayList<byte[]> readUdpFrames(File input) {
+        List<Udp> udps = PcapUtils.readUdpPackets(input);
+        ArrayList<byte[]> frames = new ArrayList<>(udps.size());
+        for (Udp udp : udps) {
+            if (udp != null) {
+                frames.add(udp.getFrame().getBytes());
+            }
+        }
+        return frames;
+    }
+
+    static ArrayList<byte[]> readUdpPayloads(File input) {
+        List<Udp> udps = PcapUtils.readUdpPackets(input);
+        ArrayList<byte[]> payloads = new ArrayList<>(udps.size());
+        for (Udp udp : udps) {
+            if (udp != null) {
+                payloads.add(udp.getPayload());
+            }
+        }
+        return payloads;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/NetflowDatagramClient.java	Tue Aug 11 10:27:28 2020 +0200
@@ -0,0 +1,65 @@
+package com.passus.st;
+
+import java.io.File;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class NetflowDatagramClient extends NetflowClientBase {
+
+    public static void main(String... args) throws Exception {
+        final Options options = new Options();
+        options.addOption("v", "verbose", false, "Verbose mode.");
+        options.addOption("b", "bind", true, "Bind adderss.");
+        options.addOption("p", "port", true, "Destination port. [2055]");
+        options.addOption("l", "loops", true, "Number of PCAP loops. [1]");
+        CommandLine cl = new DefaultParser().parse(options, args);
+        String[] clArgs = cl.getArgs();
+
+        File input = new File(clArgs[0]);
+        List<byte[]> frames = readUdpPayloads(input);
+        InetAddress addr = InetAddress.getByName(clArgs[1]);
+        boolean verbose = cl.hasOption("v");
+        int port = Integer.parseInt(cl.getOptionValue("p", "2055"));
+        int loops = Integer.parseInt(cl.getOptionValue("l", "1"));
+
+        InetSocketAddress bindAddress = new InetSocketAddress(0);
+        if (cl.hasOption("b")) {
+            InetAddress bindAddr = InetAddress.getByName(cl.getOptionValue("b"));
+            bindAddress = new InetSocketAddress(bindAddr, 0);
+        }
+
+        System.out.println("Sending file: " + input.getAbsolutePath() + " to " + addr + ":" + port + " from " + bindAddress);
+
+        long numBytes = 0;
+        for (byte[] frame : frames) {
+            numBytes += frame.length;
+        }
+
+        try (DatagramSocket clientSocket = new DatagramSocket(bindAddress)) {
+            long t0 = System.currentTimeMillis();
+            for (int l = 0; l < loops; l++) {
+                for (byte[] payload : frames) {
+                    DatagramPacket sendPacket = new DatagramPacket(payload, payload.length, addr, port);
+                    clientSocket.send(sendPacket);
+                }
+            }
+            t0 = System.currentTimeMillis() - t0;
+            float mbps = ((numBytes / MEGA) * loops) / (t0 / 8000.f);
+            float fps = (frames.size() * loops) / (t0 / 1000.f);
+            System.out.format("CLIENT sent %d loops of %d frames / %d bytes\n", loops, frames.size(), numBytes);
+            System.out.format("total %d bytes in %d ms (%f Mbit/s)\n", numBytes * loops, t0, mbps);
+            System.out.format("total %d frames in %d ms (%f f/s)\n", frames.size() * loops, t0, fps);
+        }
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/NetflowDpdkClient.java	Tue Aug 11 10:27:28 2020 +0200
@@ -0,0 +1,89 @@
+package com.passus.st;
+
+import com.passus.dpdk.DpdkAO;
+import com.passus.net.utils.AddressUtils;
+import java.io.File;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+
+import static com.passus.st.FrameUtils.*;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class NetflowDpdkClient extends NetflowClientBase {
+
+    public static void main(String[] args) throws Exception {
+        final Options options = new Options();
+        options.addOption("v", "verbose", true, "Verbosity bitmask.");
+        options.addOption("p", "port", true, "Destination port. [2055]");
+        options.addOption("l", "loops", true, "Number of PCAP loops. [1]");
+        options.addOption("bs", "batch-size", true, "Number of packets to batch in one send(). [0]");
+        options.addOption("bt", "batch-type", true, "Type of batch: 0->b[],l[] 1->b[][], 2->[][]+. [0]");
+        options.addOption("th", "threads", true, "Number of threads.");
+        options.addOption("skipCleanup", "skipCleanup", false, "Skip DpdkAO.cleanApp()");
+        options.addOption("app", "app-args", true, "App args. [-- -p 1 --num-queues=2 --proc-id=0]");
+        CommandLine cl = new DefaultParser().parse(options, args);
+        String[] clArgs = cl.getArgs();
+
+        File input = new File(clArgs[0]);
+        String memPoolLib = clArgs[1];
+        byte[] srcMac = AddressUtils.stringToMACAddress(clArgs[2].trim());
+        byte[] dstMac = AddressUtils.stringToMACAddress(clArgs[3].trim());
+        byte[] srcIp = AddressUtils.stringToIp4Bytes(clArgs[4].trim());
+        byte[] dstIp = AddressUtils.stringToIp4Bytes(clArgs[5].trim());
+        int verbose = parseInt(cl.getOptionValue("v", "0"));
+        int port = Integer.parseInt(cl.getOptionValue("p", "2055"));
+        int loops = Integer.parseInt(cl.getOptionValue("l", "1"));
+        int batchSize = Integer.parseInt(cl.getOptionValue("bs", "0"));
+        int batchType = Integer.parseInt(cl.getOptionValue("bt", "0"));
+        int numThreads = Integer.parseInt(cl.getOptionValue("th", "0"));
+        boolean cleanup = !cl.hasOption("skipCleanup");
+
+        String[] eal_args = {"-l", "1,2", "-n", "4", "--proc-type=auto", "-d", memPoolLib};
+        String[] app_args = {"--", "-p", "1", "--num-queues=2", "--proc-id=0"};
+        DpdkAO.setVerbose(verbose);
+        DpdkAO.initEal(eal_args);
+        DpdkAO.initApp(app_args);
+
+        List<byte[]> frames = readUdpFrames(input);
+        for (byte[] frame : frames) {
+            processFrame(frame, srcMac, dstMac, srcIp, dstIp, 51340, port);
+        }
+
+        System.out.println("CLIENT: DPDK JNI build info: " + DpdkAO.buildInfo());
+        System.out.println("CLIENT: sending file " + input.getAbsolutePath());
+        DpdkBatch batch = DpdkBatch.resolve(batchSize, frames, batchType);
+        if (numThreads == 0) {
+            batch.run(loops);
+        } else {
+            Thread[] threads = new Thread[numThreads];
+            for (int i = 0; i < numThreads; i++) {
+                threads[i] = new Thread(() -> batch.run(loops));
+            }
+            for (int i = 0; i < numThreads; i++) {
+                threads[i].start();
+            }
+            for (int i = 0; i < numThreads; i++) {
+                threads[i].join();
+            }
+        }
+        if (cleanup) {
+            DpdkAO.cleanApp();
+        }
+    }
+
+    static int parseInt(String s) {
+        s = s.toLowerCase().trim();
+        if (s.startsWith("0x")) {
+            return Integer.parseInt(s.substring(2), 16);
+        } else if (s.startsWith("0b")) {
+            return Integer.parseInt(s.substring(2), 2);
+        } else {
+            return Integer.parseInt(s, 10);
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/NetflowPcapClient.java	Tue Aug 11 10:27:28 2020 +0200
@@ -0,0 +1,68 @@
+package com.passus.st;
+
+import com.passus.net.utils.AddressUtils;
+import com.passus.pcap.Pcap;
+import java.io.File;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+
+import static com.passus.st.FrameUtils.*;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class NetflowPcapClient extends NetflowClientBase {
+
+    public static void main(String... args) throws Exception {
+        final Options options = new Options();
+        options.addOption("v", "verbose", false, "Verbose mode.");
+        options.addOption("p", "port", true, "Destination port. [2055]");
+        options.addOption("l", "loops", true, "Number of PCAP loops. [1]");
+        CommandLine cl = new DefaultParser().parse(options, args);
+        String[] clArgs = cl.getArgs();
+
+        File input = new File(clArgs[0]);
+        String device = clArgs[1];
+        byte[] srcMac = AddressUtils.stringToMACAddress(clArgs[2].trim());
+        byte[] dstMac = AddressUtils.stringToMACAddress(clArgs[3].trim());
+        byte[] srcIp = AddressUtils.stringToIp4Bytes(clArgs[4].trim());
+        byte[] dstIp = AddressUtils.stringToIp4Bytes(clArgs[5].trim());
+        boolean verbose = cl.hasOption("v");
+        int port = Integer.parseInt(cl.getOptionValue("p", "2055"));
+        int loops = Integer.parseInt(cl.getOptionValue("l", "1"));
+
+        long numBytes = 0;
+        List<byte[]> frames = readUdpFrames(input);
+        for (byte[] frame : frames) {
+            processFrame(frame, srcMac, dstMac, srcIp, dstIp, 51340, port);
+            numBytes += frame.length;
+        }
+
+        System.out.println("Sending file: " + input.getAbsolutePath());
+        StringBuilder errors = new StringBuilder();
+        try (Pcap pcap = Pcap.openLive(device, errors)) {
+            if (pcap == null) {
+                System.out.println("Cannot open device: " + device);
+                System.out.println("Error: " + errors.toString());
+                return;
+            }
+
+            long t0 = System.currentTimeMillis();
+            for (int l = 0; l < loops; l++) {
+                for (byte[] frame : frames) {
+                    pcap.sendPacket(frame.length, frame);
+                }
+            }
+            t0 = System.currentTimeMillis() - t0;
+            float mbps = ((numBytes / MEGA) * loops) / (t0 / 8000.f);
+            float fps = (frames.size() * loops) / (t0 / 1000.f);
+            System.out.format("CLIENT sent %d loops of %d frames / %d bytes\n", loops, frames.size(), numBytes);
+            System.out.format("total %d bytes in %d ms (%f Mbit/s)\n", numBytes * loops, t0, mbps);
+            System.out.format("total %d frames in %d ms (%f f/s)\n", frames.size() * loops, t0, fps);
+        }
+    }
+
+}