changeset 1072:0cafeaef7e1f

UdpFrameTemplate, UnidirectionalRawPacketWorker - frame dynamic modification in progress
author Devel 2
date Tue, 28 Apr 2020 13:51:31 +0200
parents 06fc08a2d7d5
children 2d7ad6c35a8b
files stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketEmitter.java stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketEmitter.java stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketWorker.java stress-tester/src/main/java/com/passus/st/emitter/raw/UdpFrameTemplate.java stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java stress-tester/src/test/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketEmitterTest.java
diffstat 9 files changed, 191 insertions(+), 48 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketEmitter.java	Tue Apr 28 09:57:36 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketEmitter.java	Tue Apr 28 13:51:31 2020 +0200
@@ -1,9 +1,11 @@
 package com.passus.st.emitter.raw;
 
 import com.passus.commons.annotations.Plugin;
+import com.passus.config.annotations.NodeDefinitionCreate;
 import com.passus.dpdk.DpdkAO;
 import com.passus.st.plugin.PluginConstants;
 
+@NodeDefinitionCreate(UnidirectionalRawPacketEmitter.UnidirectionalRawPacketEmitterNodeDefCreator.class)
 @Plugin(name = DpdkUnidirectionalRawPacketEmitter.TYPE, category = PluginConstants.CATEGORY_EMITTER)
 public class DpdkUnidirectionalRawPacketEmitter extends UnidirectionalRawPacketEmitter<DpdkAO> {
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java	Tue Apr 28 09:57:36 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java	Tue Apr 28 13:51:31 2020 +0200
@@ -4,7 +4,6 @@
 
 import java.io.IOException;
 import java.net.NetworkInterface;
-import java.util.Queue;
 
 public class DpdkUnidirectionalRawPacketWorker extends UnidirectionalRawPacketWorker<DpdkAO> {
 
@@ -37,20 +36,13 @@
     }
 
     @Override
-    protected int doWrite0(UnidirectionalRawPacketChannelContext<DpdkAO> channelContext) throws IOException {
-        Queue<byte[]> queue = channelContext.getDataQueue();
-        int written = 0;
-        while (!queue.isEmpty()) {
-            byte[] buf = queue.poll();
-            int res = DpdkAO.sendPacket(buf.length, buf);
-            if (res >= 0) {
-                written += res;
-            } else {
-                throw new IOException("Unable to send packet. DPDK error.");
-            }
+    protected int doWrite0(UnidirectionalRawPacketChannelContext<DpdkAO> channelContext, byte[] frame, int length) throws IOException {
+        int res = DpdkAO.sendPacket(length, frame);
+        if (res < 0) {
+            throw new IOException("Unable to send packet. DPDK error.");
         }
 
-        return written;
+        return res;
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketEmitter.java	Tue Apr 28 09:57:36 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketEmitter.java	Tue Apr 28 13:51:31 2020 +0200
@@ -1,8 +1,10 @@
 package com.passus.st.emitter.raw;
 
 import com.passus.commons.annotations.Plugin;
+import com.passus.config.annotations.NodeDefinitionCreate;
 import com.passus.st.plugin.PluginConstants;
 
+@NodeDefinitionCreate(UnidirectionalRawPacketEmitter.UnidirectionalRawPacketEmitterNodeDefCreator.class)
 @Plugin(name = PcapUnidirectionalRawPacketEmitter.TYPE, category = PluginConstants.CATEGORY_EMITTER)
 public class PcapUnidirectionalRawPacketEmitter extends UnidirectionalRawPacketEmitter<PcapOutput> {
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketWorker.java	Tue Apr 28 09:57:36 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketWorker.java	Tue Apr 28 13:51:31 2020 +0200
@@ -10,7 +10,6 @@
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Queue;
 
 class PcapUnidirectionalRawPacketWorker extends UnidirectionalRawPacketWorker<PcapOutput> {
 
@@ -57,21 +56,14 @@
     }
 
     @Override
-    protected int doWrite0(UnidirectionalRawPacketChannelContext<PcapOutput> channelContext) throws IOException {
+    protected int doWrite0(UnidirectionalRawPacketChannelContext<PcapOutput> channelContext, byte[] frame, int length) throws IOException {
         PcapOutput pcap = channelContext.getEngine();
-        Queue<byte[]> queue = channelContext.getDataQueue();
-        int written = 0;
-        while (!queue.isEmpty()) {
-            byte[] buf = queue.poll();
-            int res = pcap.sendPacket(buf.length, buf);
-            if (res == 0) {
-                written += buf.length;
-            } else {
-                throw new IOException("Unable to send packet. Pcap error: " + pcap.pcap().getErr());
-            }
+        int res = pcap.sendPacket(length, frame);
+        if (res != 0) {
+            throw new IOException("Unable to send packet. Pcap error: " + pcap.pcap().getErr());
         }
 
-        return written;
+        return res;
     }
 
     @Override
@@ -87,7 +79,6 @@
         }
     }
 
-
     private static class PcapInstance {
 
         private final PcapOutput pcap;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UdpFrameTemplate.java	Tue Apr 28 13:51:31 2020 +0200
@@ -0,0 +1,53 @@
+package com.passus.st.emitter.raw;
+
+import com.passus.data.DataUtils;
+
+import static com.passus.net.FrameUtils.*;
+
+public class UdpFrameTemplate {
+
+    final byte[] frame;
+
+    final int ipv4Offset;
+
+    final int udpOffset;
+
+    final int headerLength;
+
+    final int payloadOffset;
+
+    final int maxPayloadLength;
+
+    int payloadLength;
+
+    int length;
+
+    public UdpFrameTemplate(byte[] frame,
+                            int ipv4Offset,
+                            int udpOffset,
+                            int payloadOffset) {
+        this.frame = frame;
+        this.ipv4Offset = ipv4Offset;
+        this.udpOffset = udpOffset;
+        this.payloadOffset = payloadOffset;
+        this.maxPayloadLength = frame.length - payloadOffset;
+        headerLength = ETH_HEADER_LENGTH + IPV4_HEADER_LENGTH + UDP_HEADER_LENGTH;
+    }
+
+    public void updatePayload(byte[] payload) {
+        updatePayload(payload, payload.length);
+    }
+
+    public void updatePayload(byte[] payload, int length) {
+        payloadLength = length;
+        DataUtils.writeInt2(frame, ipv4Offset + 2, length + IPV4_HEADER_LENGTH + UDP_HEADER_LENGTH);
+        DataUtils.writeInt2(frame, udpOffset + 4, length + UDP_HEADER_LENGTH);
+
+        int ipv4CheckSum = calcIp4Checksum(frame);
+        DataUtils.writeInt2(frame, ipv4Offset + 10, ipv4CheckSum);
+        System.arraycopy(payload, 0, frame, payloadOffset, payloadLength);
+
+        length = payloadLength + headerLength;
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketChannelContext.java	Tue Apr 28 09:57:36 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketChannelContext.java	Tue Apr 28 13:51:31 2020 +0200
@@ -22,7 +22,7 @@
 
     final Queue<byte[]> dataQueue;
 
-    final byte[] buffer;
+    UdpFrameTemplate frameTemplate;
 
     private FlowContext flowContext;
 
@@ -52,7 +52,6 @@
         this.remoteAddress = remoteAddress;
         this.localHardwareAddress = localHardwareAddress;
         this.remoteHardwareAddress = remoteHardwareAddress;
-        this.buffer = new byte[DEFAULT_BUFFER_SIZE];
     }
 
     public E getEngine() {
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java	Tue Apr 28 09:57:36 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java	Tue Apr 28 13:51:31 2020 +0200
@@ -1,21 +1,32 @@
 package com.passus.st.emitter.raw;
 
 import com.passus.commons.Assert;
+import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
+import com.passus.config.schema.MapNodeDefinition;
+import com.passus.config.validation.LongValidator;
 import com.passus.net.session.Session;
 import com.passus.st.emitter.*;
 import com.passus.st.metric.MetricsContainer;
 
 import java.io.IOException;
 
+import static com.passus.config.schema.ConfigurationSchemaBuilder.*;
+import static com.passus.config.validation.ValidatonUtils.OP_GREATER;
+import static com.passus.net.FrameUtils.*;
+
 public abstract class UnidirectionalRawPacketEmitter<E> implements Emitter {
 
-    private static final int DEFAULT_WORKERS_NUM = 4;
+    private static final int DEFAULT_THREADS = 4;
+    private static final int DEFAULT_MTU = 1500;
+    private static final boolean DEFAULT_COLLECT_METRICS = true;
+    private static final int MIN_MTU = ETH_HEADER_LENGTH + IPV4_HEADER_LENGTH + UDP_HEADER_LENGTH;
 
     private boolean started = false;
 
     private UnidirectionalRawPacketWorker<E>[] workers;
 
-    private int workersNum = DEFAULT_WORKERS_NUM;
+    private int threads = DEFAULT_THREADS;
 
     private PortPool portPool = new SynchronizedPortPoolWrapper(new PortPoolImpl());
 
@@ -25,6 +36,8 @@
 
     private boolean collectMetrics = false;
 
+    private int mtu = DEFAULT_MTU;
+
     @Override
     public void setSessionMapper(SessionMapper sessionMapper) {
         this.sessionMapper = sessionMapper;
@@ -35,13 +48,25 @@
         return sessionMapper;
     }
 
-    public int getWorkersNum() {
-        return workersNum;
+    public int getThreads() {
+        return threads;
     }
 
-    public void setWorkersNum(int workersNum) {
-        Assert.greaterThanZero(workersNum, "workersNum");
-        this.workersNum = workersNum;
+    public void setThreads(int threads) {
+        Assert.greaterThanZero(threads, "threads");
+        this.threads = threads;
+    }
+
+    public int getMtu() {
+        return mtu;
+    }
+
+    public void setMtu(int mtu) {
+        if (mtu < MIN_MTU) {
+            throw new IllegalArgumentException("MTU should be greater than " + MIN_MTU + ".");
+        }
+
+        this.mtu = mtu;
     }
 
     public MACAddressResolver getMacResolver() {
@@ -49,6 +74,7 @@
     }
 
     public void setMacResolver(MACAddressResolver macResolver) {
+        Assert.notNull(macResolver, "macResolver");
         this.macResolver = macResolver;
     }
 
@@ -73,7 +99,16 @@
 
     @Override
     public void writeMetrics(MetricsContainer container) {
-        throw new RuntimeException("Not implemented yet.");
+
+    }
+
+    @Override
+    public void configure(Configuration config, ConfigurationContext context) {
+        Emitter.super.configure(config, context);
+        setMacResolver((MACAddressResolver) config.get("macResolver"));
+        setThreads(config.getInteger("threads", DEFAULT_THREADS));
+        setMtu(config.getInteger("mtu", DEFAULT_MTU));
+        setCollectMetrics(config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS));
     }
 
     @Override
@@ -91,12 +126,13 @@
             return;
         }
 
-        workers = createWorkersArray(workersNum);
-        for (int i = 0; i < workersNum; i++) {
+        workers = createWorkersArray(threads);
+        for (int i = 0; i < threads; i++) {
             UnidirectionalRawPacketWorker<E> worker = createWorker();
             worker.setPortPool(portPool);
             worker.setMacResolver(macResolver);
             worker.setSessionMapper(sessionMapper);
+            worker.setMtu(mtu);
             worker.start();
             workers[i] = worker;
         }
@@ -110,7 +146,7 @@
             return;
         }
 
-        for (int i = 0; i < workersNum; i++) {
+        for (int i = 0; i < threads; i++) {
             UnidirectionalRawPacketWorker<E> worker = workers[i];
             worker.working = false;
             worker.interrupt();
@@ -129,7 +165,7 @@
     @Override
     public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException {
         if (sessionInfo.getTransport() != Session.PROTOCOL_UDP) {
-            throw new IllegalArgumentException("Only UDP transport supported.");
+            throw new IOException("Only UDP transport supported.");
         }
 
         int hashCode = (sessionInfo.hashCode() + workerIndex) & 0x7fffffff;
@@ -137,4 +173,20 @@
         workers[index].connect(sessionInfo, handler);
     }
 
+    public static class UnidirectionalRawPacketEmitterNodeDefCreator extends EmitterNodeDefCreator {
+
+        @Override
+        public MapNodeDefinition create() {
+            MapNodeDefinition def = super.create();
+            def.add(
+                    tupleDef("threads", valueDefInteger().addValidator(LongValidator.GREATER_ZERO)).setRequired(false),
+                    tupleDef("mtu", valueDefInteger()
+                            .addValidator(new LongValidator((long) MIN_MTU, OP_GREATER))
+                    ).setRequired(false),
+                    tupleDef("macResolver", MACAddressResolverNodeDefinitionCreator.createDef()),
+                    tupleDef("collectMetrics", valueDefBool()).setRequired(false)
+            );
+            return def;
+        }
+    }
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java	Tue Apr 28 09:57:36 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java	Tue Apr 28 13:51:31 2020 +0200
@@ -1,9 +1,13 @@
 package com.passus.st.emitter.raw;
 
 import com.passus.commons.Assert;
+import com.passus.data.DataUtils;
+import com.passus.net.FrameUtils;
 import com.passus.net.IpAddress;
 import com.passus.net.MACAddress;
 import com.passus.net.SocketAddress;
+import com.passus.net.packet.Ethernet;
+import com.passus.net.packet.Ip;
 import com.passus.st.emitter.*;
 import com.passus.st.metric.MetricSource;
 import com.passus.st.metric.MetricsContainer;
@@ -19,15 +23,21 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import static com.passus.net.FrameUtils.ETH_HEADER_LENGTH;
+import static com.passus.net.FrameUtils.IPV4_HEADER_LENGTH;
 import static com.passus.net.utils.AddressUtils.jdkInetAddresToIpAddress;
 import static com.passus.st.emitter.EmitterUtils.getConnectionParams;
 import static com.passus.st.emitter.SessionMapper.ANY_SOCKET;
 
 public abstract class UnidirectionalRawPacketWorker<E> extends Thread implements MetricSource {
 
+    protected final Logger LOGGER = LogManager.getLogger(getClass());
+
     protected static final byte[] ZERO_MAC = {0, 0, 0, 0, 0, 0};
 
-    protected final Logger LOGGER = LogManager.getLogger(UnidirectionalRawPacketWorker.class);
+    protected static final int IP_OFFSET = ETH_HEADER_LENGTH;
+
+    protected static final int UDP_OFFSET = IP_OFFSET + IPV4_HEADER_LENGTH;
 
     boolean working = true;
 
@@ -45,6 +55,8 @@
 
     private MACAddressResolver macResolver;
 
+    private int mtu;
+
     public PortPool getPortPool() {
         return portPool;
     }
@@ -72,6 +84,16 @@
         this.sessionMapper = sessionMapper;
     }
 
+    public int getMtu() {
+        return mtu;
+    }
+
+    public void setMtu(int mtu) {
+        Assert.greaterThanZero(mtu, "mtu");
+        this.mtu = mtu;
+    }
+
+
     @Override
     public boolean isCollectMetrics() {
         return collectMetrics;
@@ -151,6 +173,28 @@
 
     protected abstract E doInitEngine(UnidirectionalRawPacketChannelContext<E> channelContext, String device) throws IOException;
 
+    private void prepareFrameTemplate(UnidirectionalRawPacketChannelContext channelContext) {
+        SocketAddress srcAddress = channelContext.getLocalAddress();
+        byte[] srcMac = channelContext.getLocalHardwareAddress().getAddress();
+        int[] srcpIp = srcAddress.getIp().getAddress();
+        byte[] srcpIpBytes = DataUtils.intsToBytesBE(srcpIp);
+        int srcPort = srcAddress.getPort();
+
+        SocketAddress dstAddress = channelContext.getRemoteAddress();
+        byte[] dstMac = channelContext.getRemoteHardwareAddress().getAddress();
+        int[] dstIp = dstAddress.getIp().getAddress();
+        byte[] dstIpBytes = DataUtils.intsToBytesBE(dstIp);
+        int dstPort = dstAddress.getPort();
+
+        byte[] frame = new byte[mtu];
+        int offset = 0;
+        offset += FrameUtils.writeEth(srcMac, dstMac, Ethernet.TYPE_IPV4, frame, offset);
+        offset += FrameUtils.writeIpv4(srcpIpBytes, dstIpBytes, 0, (byte) Ip.PROTO_UDP, frame, offset);
+        offset += FrameUtils.writeUdp(srcPort, dstPort, 0, frame, offset);
+
+        channelContext.frameTemplate = new UdpFrameTemplate(frame, IP_OFFSET, UDP_OFFSET, offset);
+    }
+
     private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) {
         SessionMapper.ConnectionParams connParams = getConnectionParams(sessionInfo, handler, sessionMapper, collectMetrics, metric, LOGGER);
         if (connParams == null) {
@@ -196,8 +240,8 @@
 
         MACAddress remoteMac = macResolver.resolve(remoteAddress.getIp());
         channelContext = new UnidirectionalRawPacketChannelContext(this, handler, sessionInfo, localAddress, remoteAddress, localMac, remoteMac);
+        prepareFrameTemplate(channelContext);
         sessions.put(sessionInfo, channelContext);
-
         try {
             handler.channelRegistered(channelContext);
         } catch (Exception ex) {
@@ -214,8 +258,8 @@
         }
     }
 
-    protected abstract int doWrite0(UnidirectionalRawPacketChannelContext<E> channelContext) throws IOException;
-
+    protected abstract int doWrite0(UnidirectionalRawPacketChannelContext<E> channelContext, byte[] frame, int length) throws IOException;
+    
     private void doWrite(SessionInfo sessionInfo) {
         UnidirectionalRawPacketChannelContext<E> channelContext = sessions.get(sessionInfo);
         if (channelContext == null) {
@@ -236,7 +280,16 @@
 
         int written = 0;
         try {
-            written = doWrite0(channelContext);
+            while (!queue.isEmpty()) {
+                byte[] buf = queue.poll();
+                if (channelContext.frameTemplate.maxPayloadLength >= buf.length) {
+                    UdpFrameTemplate ft = channelContext.frameTemplate;
+                    ft.updatePayload(buf);
+                    written += doWrite0(channelContext, ft.frame, ft.length);
+                } else if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("payload.length {} > maxPayloadLength {}", buf.length, channelContext.frameTemplate.maxPayloadLength);
+                }
+            }
         } catch (Exception e) {
             doCatchException(channelContext, e);
             doClose(channelContext);
--- a/stress-tester/src/test/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketEmitterTest.java	Tue Apr 28 09:57:36 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketEmitterTest.java	Tue Apr 28 13:51:31 2020 +0200
@@ -9,7 +9,6 @@
 import com.passus.st.emitter.MapBasedMACAddressResolver;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.emitter.SessionMapper;
-import com.passus.st.emitter.raw.PcapUnidirectionalRawPacketEmitter;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
@@ -21,7 +20,7 @@
         macResolver.add("* -> 00:00:00:00:00:00");
 
         PcapUnidirectionalRawPacketEmitter emitter = new PcapUnidirectionalRawPacketEmitter();
-        emitter.setWorkersNum(1);
+        emitter.setThreads(1);
         emitter.setMacResolver(macResolver);
         if (mapper != null) {
             emitter.setSessionMapper(mapper);