changeset 1116:f8522f874d0f

DpdkEmitter - batch mode in progress
author Devel 1
date Mon, 01 Jun 2020 12:31:34 +0200
parents fdb8d3772440
children 7e874d9af91a
files stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketEmitter.java stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketWorker.java stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java
diffstat 4 files changed, 56 insertions(+), 11 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketEmitter.java	Thu May 21 14:10:56 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketEmitter.java	Mon Jun 01 12:31:34 2020 +0200
@@ -1,16 +1,37 @@
 package com.passus.st.emitter.raw;
 
 import com.passus.commons.annotations.Plugin;
+import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
 import com.passus.config.annotations.NodeDefinitionCreate;
+import com.passus.config.schema.MapNodeDefinition;
 import com.passus.dpdk.DpdkAO;
 import com.passus.st.plugin.PluginConstants;
 
-@NodeDefinitionCreate(UnidirectionalRawPacketEmitter.UnidirectionalRawPacketEmitterNodeDefCreator.class)
+import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef;
+import static com.passus.st.config.CommonNodeDefs.*;
+import static com.passus.st.emitter.raw.DpdkUnidirectionalRawPacketWorker.*;
+
+@NodeDefinitionCreate(DpdkUnidirectionalRawPacketEmitter.DpdkUnidirectionalRawPacketEmitterNodeDefCreator.class)
 @Plugin(name = DpdkUnidirectionalRawPacketEmitter.TYPE, category = PluginConstants.CATEGORY_EMITTER)
 public class DpdkUnidirectionalRawPacketEmitter extends UnidirectionalRawPacketEmitter<DpdkAO> {
 
     public static final String TYPE = "dpdk";
 
+    protected int bufferSize = DEFAULT_BUFFER_SIZE;
+    protected long bufferFlushPeriod = DEFAULT_FLUSH_PERIOD;
+
+    @Override
+    public void configure(Configuration config, ConfigurationContext context) {
+        super.configure(config, context);
+        bufferSize = config.getInteger("bufferSize", DEFAULT_BUFFER_SIZE);
+        if (bufferSize > MAX_BUFFER_SIZE) {
+            logger.warn("Underlying JNI wrapper for DPDK supports up to {} packets in batch mode.", MAX_BUFFER_SIZE);
+            bufferSize = MAX_BUFFER_SIZE;
+        }
+        bufferFlushPeriod = config.getLong("bufferFlushPeriod", DEFAULT_FLUSH_PERIOD);
+    }
+
     @Override
     protected UnidirectionalRawPacketWorker<DpdkAO>[] createWorkersArray(int workersNum) {
         return new DpdkUnidirectionalRawPacketWorker[workersNum];
@@ -18,6 +39,23 @@
 
     @Override
     protected UnidirectionalRawPacketWorker<DpdkAO> createWorker() {
-        return new DpdkUnidirectionalRawPacketWorker();
+        DpdkUnidirectionalRawPacketWorker worker = new DpdkUnidirectionalRawPacketWorker();
+        worker.setBufferSize(bufferSize);
+        worker.setBufferFlushPeriod(bufferFlushPeriod);
+        return worker;
+    }
+
+    public static class DpdkUnidirectionalRawPacketEmitterNodeDefCreator extends UnidirectionalRawPacketEmitterNodeDefCreator {
+
+        @Override
+        public MapNodeDefinition create() {
+            MapNodeDefinition def = super.create();
+            def.add(
+                    tupleDef("bufferSize", INT_GREATER_EQUAL_ZERO_DEF).setRequired(false),
+                    tupleDef("bufferFlushPeriod", LONG_GREATER_THAN_ZERO_DEF).setRequired(false)
+            );
+            return def;
+        }
+
     }
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java	Thu May 21 14:10:56 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java	Mon Jun 01 12:31:34 2020 +0200
@@ -14,12 +14,27 @@
 
 public class DpdkUnidirectionalRawPacketWorker extends UnidirectionalRawPacketWorker<DpdkAO> {
 
+    public static final int MAX_BUFFER_SIZE = DpdkAO.sendBuffSize();
+    public static final int DEFAULT_BUFFER_SIZE = 0;
+    public static final long DEFAULT_FLUSH_PERIOD = 50;
+
     private DpdkAO dpdkAO;
 
     private int dpdkInitCalls = 0;
 
     private MACAddress localMac = new MACAddress(ZERO_MAC);
 
+    protected int bufferSize = DEFAULT_BUFFER_SIZE;
+    protected long bufferFlushPeriod = DEFAULT_FLUSH_PERIOD;
+
+    public void setBufferSize(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
+    public void setBufferFlushPeriod(long bufferFlushPeriod) {
+        this.bufferFlushPeriod = bufferFlushPeriod;
+    }
+
     @Override
     protected String resolveDevice(NetworkInterface networkInterface) throws IOException {
         return null;
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketWorker.java	Thu May 21 14:10:56 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketWorker.java	Mon Jun 01 12:31:34 2020 +0200
@@ -28,6 +28,7 @@
         this.dumpFile = dumpFile;
     }
 
+    @Override
     protected String resolveDevice(NetworkInterface ifc) throws SocketException {
         //Windows workaround
         if (SystemUtils.IS_OS_WINDOWS) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java	Thu May 21 14:10:56 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java	Mon Jun 01 12:31:34 2020 +0200
@@ -153,15 +153,6 @@
         return sessions.containsKey(sessionInfo);
     }
 
-    private UnidirectionalRawPacketChannelContext<E> findContext(SessionInfo sessionInfo) {
-        UnidirectionalRawPacketChannelContext<E> context = sessions.get(sessionInfo);
-        if (context == null) {
-            throw new IllegalArgumentException("Unable to find context for session: " + sessionInfo);
-        }
-
-        return context;
-    }
-
     private int nextFreeLocalPort() throws IOException {
         int port = portPool.borrow();
         if (port == -1) {