Mercurial > stress-tester
changeset 1297:2090eab05b58
Netflow clients moved from passus-net-ex
author | Devel 1 |
---|---|
date | Tue, 11 Aug 2020 10:27:28 +0200 |
parents | dcb1f063650c |
children | 90ce0004c15f |
files | stress-tester/src/main/java/com/passus/st/DpdkBatch.java stress-tester/src/main/java/com/passus/st/FrameUtils.java stress-tester/src/main/java/com/passus/st/NetflowClientBase.java stress-tester/src/main/java/com/passus/st/NetflowDatagramClient.java stress-tester/src/main/java/com/passus/st/NetflowDpdkClient.java stress-tester/src/main/java/com/passus/st/NetflowPcapClient.java |
diffstat | 6 files changed, 519 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/DpdkBatch.java Tue Aug 11 10:27:28 2020 +0200 @@ -0,0 +1,201 @@ +package com.passus.st; + +import com.passus.dpdk.DpdkAO; +import java.util.List; + +import static com.passus.st.NetflowClientBase.MEGA; + +/** + * + * @author mikolaj.podbielski + */ +abstract class DpdkBatch { + + final int numBatches; + final int numFrames; + final long numBytes; + + public DpdkBatch(int batchSize, List<byte[]> frames) { + this.numBatches = (frames.size() + batchSize - 1) / batchSize; + this.numFrames = frames.size(); + this.numBytes = sumLengths(frames); + } + + private static long sumLengths(List<byte[]> frames) { + long lengths = 0; + for (byte[] frame : frames) { + lengths += frame.length; + } + return lengths; + } + + public void run(int loops) { + long t0 = System.currentTimeMillis(); + for (int l = 0; l < loops; l++) { + runLoop(); + } + t0 = System.currentTimeMillis() - t0; + float mbps = ((numBytes / MEGA) * loops) / (t0 / 8000.f); + float fps = (numFrames * loops) / (t0 / 1000.f); + System.out.format("CLIENT sent %d loops of %d frames / %d bytes\n", loops, numFrames, numBytes); + System.out.format("total %d bytes in %d ms (%f Mbit/s)\n", numBytes * loops, t0, mbps); + System.out.format("total %d frames in %d ms (%f f/s)\n", numFrames * loops, t0, fps); + DpdkAO.printStats(); + } + + public void runVerbose(int loops) { + for (int l = 0; l < loops; l++) { + long t0 = System.currentTimeMillis(); + int sent = runLoop(); + t0 = System.currentTimeMillis() - t0; + System.out.format("CLIENT loop %d sent %d of %d in %d ms thread %s\n", + l, sent, numFrames, t0, Thread.currentThread().getName()); + DpdkAO.printStats(); + } + } + + public int runLoop() { + int sent = 0; + for (int bidx = 0; bidx < numBatches; bidx++) { + sent += runBatch(bidx); + } + return sent; + } + + public abstract int runBatch(int bidx); + + static DpdkBatch resolve(int batchSize, List<byte[]> frames, int batchType) { + if (batchSize <= 0) { + System.out.println("CLIENT: using sendPacket"); + return new NoBatch(batchSize, frames); + } + switch (batchType) { + case 0: + System.out.println("CLIENT: using sendPackets(byte[], int[])"); + return new Batch0(batchSize, frames); + case 1: + System.out.println("CLIENT: using sendPackets(byte[][])"); + return new Batch1(batchSize, frames); + case 2: + System.out.println("CLIENT: using sendPackets(byte[][] int[] int)"); + return new Batch2(batchSize, frames); + default: + throw new IllegalArgumentException(); + } + + } + + static class NoBatch extends DpdkBatch { + + final List<byte[]> frames; + + public NoBatch(int batchSize, List<byte[]> frames) { + super(batchSize, frames); + this.frames = frames; + } + + @Override + public int runLoop() { + int sent = 0; + for (byte[] frame : frames) { + sent += DpdkAO.sendPacket(frame.length, frame); + } + return sent; + } + + @Override + public int runBatch(int bidx) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + } + + static class Batch0 extends DpdkBatch { + + final byte[][] batches; + final int[][] lengths; + + public Batch0(int batchSize, List<byte[]> frames) { + super(batchSize, frames); + batches = new byte[numBatches][]; + lengths = new int[numBatches][]; + + for (int bidx = 0; bidx < numBatches; bidx++) { + final int thisBatchBegin = batchSize * bidx; + final int thisBatchEnd = Math.min(thisBatchBegin + batchSize, frames.size()); + + int thisBatchBytes = 0; + for (int fidx = thisBatchBegin; fidx < thisBatchEnd; fidx++) { + thisBatchBytes += frames.get(fidx).length; + } + batches[bidx] = new byte[thisBatchBytes]; + lengths[bidx] = new int[thisBatchEnd - thisBatchBegin]; + int octIdx = 0; + int offIdx = 0; + for (int fidx = thisBatchBegin; fidx < thisBatchEnd; fidx++) { + int flen = frames.get(fidx).length; + lengths[bidx][offIdx++] = flen; + System.arraycopy(frames.get(fidx), 0, batches[bidx], octIdx, flen); + octIdx += flen; + } + } + } + + @Override + public int runBatch(int bidx) { + return DpdkAO.sendPackets(batches[bidx], lengths[bidx]); + } + } + + static class Batch1 extends DpdkBatch { + + final byte[][][] batches; + + public Batch1(int batchSize, List<byte[]> frames) { + super(batchSize, frames); + batches = new byte[numBatches][][]; + for (int bidx = 0; bidx < numBatches; bidx++) { + final int thisBatchBegin = batchSize * bidx; + final int thisBatchEnd = Math.min(thisBatchBegin + batchSize, frames.size()); + batches[bidx] = new byte[thisBatchEnd - thisBatchBegin][]; + for (int fidx = thisBatchBegin, ii = 0; fidx < thisBatchEnd; fidx++, ii++) { + batches[bidx][ii] = frames.get(fidx); + } + } + } + + @Override + public int runBatch(int bidx) { + return DpdkAO.sendPackets(batches[bidx]); + } + } + + static class Batch2 extends DpdkBatch { + + final byte[][][] batches; + final int[][] lengths; + final int[] npkts; + + public Batch2(int batchSize, List<byte[]> frames) { + super(batchSize, frames); + batches = new byte[numBatches][][]; + lengths = new int[numBatches][]; + npkts = new int[numBatches]; + for (int bidx = 0; bidx < numBatches; bidx++) { + final int thisBatchBegin = batchSize * bidx; + final int thisBatchEnd = Math.min(thisBatchBegin + batchSize, frames.size()); + batches[bidx] = new byte[batchSize][]; + lengths[bidx] = new int[thisBatchEnd - thisBatchBegin]; + npkts[bidx] = thisBatchEnd - thisBatchBegin; + for (int fidx = thisBatchBegin, ii = 0; fidx < thisBatchEnd; fidx++, ii++) { + batches[bidx][ii] = frames.get(fidx); + lengths[bidx][ii] = frames.get(fidx).length; + } + } + } + + @Override + public int runBatch(int bidx) { + return DpdkAO.sendPackets0(batches[bidx], lengths[bidx], npkts[bidx]); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/FrameUtils.java Tue Aug 11 10:27:28 2020 +0200 @@ -0,0 +1,58 @@ +package com.passus.st; + +import com.passus.data.DataUtils; +import com.passus.net.packet.Ethernet; + +import static com.passus.net.FrameUtils.*; + +/** + * + * @author mikolaj.podbielski + */ +class FrameUtils { + + static final int IP_OFFSET = Ethernet.HEADER_LENGTH; + static final int UDP_LENGTH = 8; + + static void processFrame(byte[] frameBytes, byte[] srcMac, byte[] dstMac, byte[] srcIp, byte[] dstIp, int srcPort, int dstPort) { + final int ip4ChkOff = IP_OFFSET + 10; + final int ipLength = (frameBytes[IP_OFFSET] & 0x0f) << 2; + final int udpChkOff = IP_OFFSET + ipLength + 6; + + rewriteMacs(frameBytes, srcMac, dstMac); + rewriteIps(frameBytes, srcIp, dstIp); + rewriteUdpPorts(frameBytes, 51340, 2055, ipLength); + int ip4Chk = calcIp4Checksum(frameBytes, IP_OFFSET); + int udpChk = calcUdpChecksum(frameBytes, IP_OFFSET, ipLength); + DataUtils.writeInt2(frameBytes, ip4ChkOff, ip4Chk); + DataUtils.writeInt2(frameBytes, udpChkOff, udpChk); + } + + static void rewriteUdpPorts(byte[] frame, int srcPort, int dstPort, int ipLength) { + if (srcPort >= 0) { + DataUtils.writeInt2(frame, IP_OFFSET + ipLength, srcPort); + } + if (dstPort >= 0) { + DataUtils.writeInt2(frame, IP_OFFSET + ipLength + 2, dstPort); + } + } + + static void rewriteIps(byte[] frame, byte[] srcIp, byte[] dstIp) { + if (srcIp != null) { + System.arraycopy(srcIp, 0, frame, IP_OFFSET + 12, 4); + } + if (dstIp != null) { + System.arraycopy(dstIp, 0, frame, IP_OFFSET + 16, 4); + } + } + + static void rewriteMacs(byte[] frame, byte[] srcMac, byte[] dstMac) { + if (dstMac != null) { + System.arraycopy(dstMac, 0, frame, 0, 6); + } + if (srcMac != null) { + System.arraycopy(srcMac, 0, frame, 6, 6); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/NetflowClientBase.java Tue Aug 11 10:27:28 2020 +0200 @@ -0,0 +1,38 @@ +package com.passus.st; + +import com.passus.net.packet.Udp; +import com.passus.net.source.pcap.PcapUtils; +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +/** + * + * @author mikolaj.podbielski + */ +public class NetflowClientBase { + + protected static final float MEGA = 1 << 20; + + static ArrayList<byte[]> readUdpFrames(File input) { + List<Udp> udps = PcapUtils.readUdpPackets(input); + ArrayList<byte[]> frames = new ArrayList<>(udps.size()); + for (Udp udp : udps) { + if (udp != null) { + frames.add(udp.getFrame().getBytes()); + } + } + return frames; + } + + static ArrayList<byte[]> readUdpPayloads(File input) { + List<Udp> udps = PcapUtils.readUdpPackets(input); + ArrayList<byte[]> payloads = new ArrayList<>(udps.size()); + for (Udp udp : udps) { + if (udp != null) { + payloads.add(udp.getPayload()); + } + } + return payloads; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/NetflowDatagramClient.java Tue Aug 11 10:27:28 2020 +0200 @@ -0,0 +1,65 @@ +package com.passus.st; + +import java.io.File; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; + +/** + * + * @author mikolaj.podbielski + */ +public class NetflowDatagramClient extends NetflowClientBase { + + public static void main(String... args) throws Exception { + final Options options = new Options(); + options.addOption("v", "verbose", false, "Verbose mode."); + options.addOption("b", "bind", true, "Bind adderss."); + options.addOption("p", "port", true, "Destination port. [2055]"); + options.addOption("l", "loops", true, "Number of PCAP loops. [1]"); + CommandLine cl = new DefaultParser().parse(options, args); + String[] clArgs = cl.getArgs(); + + File input = new File(clArgs[0]); + List<byte[]> frames = readUdpPayloads(input); + InetAddress addr = InetAddress.getByName(clArgs[1]); + boolean verbose = cl.hasOption("v"); + int port = Integer.parseInt(cl.getOptionValue("p", "2055")); + int loops = Integer.parseInt(cl.getOptionValue("l", "1")); + + InetSocketAddress bindAddress = new InetSocketAddress(0); + if (cl.hasOption("b")) { + InetAddress bindAddr = InetAddress.getByName(cl.getOptionValue("b")); + bindAddress = new InetSocketAddress(bindAddr, 0); + } + + System.out.println("Sending file: " + input.getAbsolutePath() + " to " + addr + ":" + port + " from " + bindAddress); + + long numBytes = 0; + for (byte[] frame : frames) { + numBytes += frame.length; + } + + try (DatagramSocket clientSocket = new DatagramSocket(bindAddress)) { + long t0 = System.currentTimeMillis(); + for (int l = 0; l < loops; l++) { + for (byte[] payload : frames) { + DatagramPacket sendPacket = new DatagramPacket(payload, payload.length, addr, port); + clientSocket.send(sendPacket); + } + } + t0 = System.currentTimeMillis() - t0; + float mbps = ((numBytes / MEGA) * loops) / (t0 / 8000.f); + float fps = (frames.size() * loops) / (t0 / 1000.f); + System.out.format("CLIENT sent %d loops of %d frames / %d bytes\n", loops, frames.size(), numBytes); + System.out.format("total %d bytes in %d ms (%f Mbit/s)\n", numBytes * loops, t0, mbps); + System.out.format("total %d frames in %d ms (%f f/s)\n", frames.size() * loops, t0, fps); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/NetflowDpdkClient.java Tue Aug 11 10:27:28 2020 +0200 @@ -0,0 +1,89 @@ +package com.passus.st; + +import com.passus.dpdk.DpdkAO; +import com.passus.net.utils.AddressUtils; +import java.io.File; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; + +import static com.passus.st.FrameUtils.*; + +/** + * + * @author mikolaj.podbielski + */ +public class NetflowDpdkClient extends NetflowClientBase { + + public static void main(String[] args) throws Exception { + final Options options = new Options(); + options.addOption("v", "verbose", true, "Verbosity bitmask."); + options.addOption("p", "port", true, "Destination port. [2055]"); + options.addOption("l", "loops", true, "Number of PCAP loops. [1]"); + options.addOption("bs", "batch-size", true, "Number of packets to batch in one send(). [0]"); + options.addOption("bt", "batch-type", true, "Type of batch: 0->b[],l[] 1->b[][], 2->[][]+. [0]"); + options.addOption("th", "threads", true, "Number of threads."); + options.addOption("skipCleanup", "skipCleanup", false, "Skip DpdkAO.cleanApp()"); + options.addOption("app", "app-args", true, "App args. [-- -p 1 --num-queues=2 --proc-id=0]"); + CommandLine cl = new DefaultParser().parse(options, args); + String[] clArgs = cl.getArgs(); + + File input = new File(clArgs[0]); + String memPoolLib = clArgs[1]; + byte[] srcMac = AddressUtils.stringToMACAddress(clArgs[2].trim()); + byte[] dstMac = AddressUtils.stringToMACAddress(clArgs[3].trim()); + byte[] srcIp = AddressUtils.stringToIp4Bytes(clArgs[4].trim()); + byte[] dstIp = AddressUtils.stringToIp4Bytes(clArgs[5].trim()); + int verbose = parseInt(cl.getOptionValue("v", "0")); + int port = Integer.parseInt(cl.getOptionValue("p", "2055")); + int loops = Integer.parseInt(cl.getOptionValue("l", "1")); + int batchSize = Integer.parseInt(cl.getOptionValue("bs", "0")); + int batchType = Integer.parseInt(cl.getOptionValue("bt", "0")); + int numThreads = Integer.parseInt(cl.getOptionValue("th", "0")); + boolean cleanup = !cl.hasOption("skipCleanup"); + + String[] eal_args = {"-l", "1,2", "-n", "4", "--proc-type=auto", "-d", memPoolLib}; + String[] app_args = {"--", "-p", "1", "--num-queues=2", "--proc-id=0"}; + DpdkAO.setVerbose(verbose); + DpdkAO.initEal(eal_args); + DpdkAO.initApp(app_args); + + List<byte[]> frames = readUdpFrames(input); + for (byte[] frame : frames) { + processFrame(frame, srcMac, dstMac, srcIp, dstIp, 51340, port); + } + + System.out.println("CLIENT: DPDK JNI build info: " + DpdkAO.buildInfo()); + System.out.println("CLIENT: sending file " + input.getAbsolutePath()); + DpdkBatch batch = DpdkBatch.resolve(batchSize, frames, batchType); + if (numThreads == 0) { + batch.run(loops); + } else { + Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < numThreads; i++) { + threads[i] = new Thread(() -> batch.run(loops)); + } + for (int i = 0; i < numThreads; i++) { + threads[i].start(); + } + for (int i = 0; i < numThreads; i++) { + threads[i].join(); + } + } + if (cleanup) { + DpdkAO.cleanApp(); + } + } + + static int parseInt(String s) { + s = s.toLowerCase().trim(); + if (s.startsWith("0x")) { + return Integer.parseInt(s.substring(2), 16); + } else if (s.startsWith("0b")) { + return Integer.parseInt(s.substring(2), 2); + } else { + return Integer.parseInt(s, 10); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/NetflowPcapClient.java Tue Aug 11 10:27:28 2020 +0200 @@ -0,0 +1,68 @@ +package com.passus.st; + +import com.passus.net.utils.AddressUtils; +import com.passus.pcap.Pcap; +import java.io.File; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; + +import static com.passus.st.FrameUtils.*; + +/** + * + * @author mikolaj.podbielski + */ +public class NetflowPcapClient extends NetflowClientBase { + + public static void main(String... args) throws Exception { + final Options options = new Options(); + options.addOption("v", "verbose", false, "Verbose mode."); + options.addOption("p", "port", true, "Destination port. [2055]"); + options.addOption("l", "loops", true, "Number of PCAP loops. [1]"); + CommandLine cl = new DefaultParser().parse(options, args); + String[] clArgs = cl.getArgs(); + + File input = new File(clArgs[0]); + String device = clArgs[1]; + byte[] srcMac = AddressUtils.stringToMACAddress(clArgs[2].trim()); + byte[] dstMac = AddressUtils.stringToMACAddress(clArgs[3].trim()); + byte[] srcIp = AddressUtils.stringToIp4Bytes(clArgs[4].trim()); + byte[] dstIp = AddressUtils.stringToIp4Bytes(clArgs[5].trim()); + boolean verbose = cl.hasOption("v"); + int port = Integer.parseInt(cl.getOptionValue("p", "2055")); + int loops = Integer.parseInt(cl.getOptionValue("l", "1")); + + long numBytes = 0; + List<byte[]> frames = readUdpFrames(input); + for (byte[] frame : frames) { + processFrame(frame, srcMac, dstMac, srcIp, dstIp, 51340, port); + numBytes += frame.length; + } + + System.out.println("Sending file: " + input.getAbsolutePath()); + StringBuilder errors = new StringBuilder(); + try (Pcap pcap = Pcap.openLive(device, errors)) { + if (pcap == null) { + System.out.println("Cannot open device: " + device); + System.out.println("Error: " + errors.toString()); + return; + } + + long t0 = System.currentTimeMillis(); + for (int l = 0; l < loops; l++) { + for (byte[] frame : frames) { + pcap.sendPacket(frame.length, frame); + } + } + t0 = System.currentTimeMillis() - t0; + float mbps = ((numBytes / MEGA) * loops) / (t0 / 8000.f); + float fps = (frames.size() * loops) / (t0 / 1000.f); + System.out.format("CLIENT sent %d loops of %d frames / %d bytes\n", loops, frames.size(), numBytes); + System.out.format("total %d bytes in %d ms (%f Mbit/s)\n", numBytes * loops, t0, mbps); + System.out.format("total %d frames in %d ms (%f f/s)\n", frames.size() * loops, t0, fps); + } + } + +}