changeset 1119:9bae1220fef2

DpdkUnidirectionalRawPacketWorker - batch mode
author Devel 1
date Tue, 02 Jun 2020 11:46:12 +0200
parents a9d9c6e9bf19
children 7f40755547ad
files stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java
diffstat 1 files changed, 54 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java	Tue Jun 02 11:44:26 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java	Tue Jun 02 11:46:12 2020 +0200
@@ -1,5 +1,8 @@
 package com.passus.st.emitter.raw;
 
+import com.passus.commons.time.DefaultScheduledTimerService;
+import com.passus.commons.time.ScheduledTimerCallback;
+import com.passus.commons.time.ScheduledTimerService;
 import com.passus.dpdk.DpdkAO;
 import com.passus.net.MACAddress;
 import com.passus.net.SocketAddress;
@@ -24,8 +27,12 @@
 
     private MACAddress localMac = new MACAddress(ZERO_MAC);
 
-    protected int bufferSize = DEFAULT_BUFFER_SIZE;
-    protected long bufferFlushPeriod = DEFAULT_FLUSH_PERIOD;
+    private int bufferSize = DEFAULT_BUFFER_SIZE;
+    private long bufferFlushPeriod = DEFAULT_FLUSH_PERIOD;
+    private ScheduledTimerService scheduler;
+    private byte[][] bufferBytes;
+    private int[] bufferLengths;
+    private int bufferPkts;
 
     public void setBufferSize(int bufferSize) {
         this.bufferSize = bufferSize;
@@ -104,13 +111,52 @@
     }
 
     @Override
+    protected void onInit() {
+        synchronized (this) {
+            if (bufferSize > 0) {
+                bufferBytes = new byte[bufferSize][];
+                bufferLengths = new int[bufferSize];
+                bufferPkts = 0;
+            }
+        }
+        scheduler = new DefaultScheduledTimerService(getClass().getSimpleName() + "-Scheduler", bufferFlushPeriod);
+        scheduler.setCallback(this::flushBuffer);
+        scheduler.start();
+    }
+
+    @Override
+    protected void onFinish() {
+        if (scheduler != null) {
+            scheduler.stop();
+            scheduler = null;
+        }
+    }
+
+    private void flushBuffer() {
+        synchronized (this) {
+            DpdkAO.sendPackets0(bufferBytes, bufferLengths, bufferPkts);
+            bufferPkts = 0;
+        }
+    }
+
+    @Override
     protected int doWrite0(UnidirectionalRawPacketChannelContext<DpdkAO> channelContext, byte[] frame, int length) throws IOException {
-        int res = DpdkAO.sendPacket(length, frame);
-        if (res < 0) {
-            throw new IOException("Unable to send packet. DPDK error.");
+        if (bufferSize > 0) {
+            synchronized (this) {
+                bufferBytes[bufferPkts] = frame;
+                bufferLengths[bufferPkts] = length;
+                bufferPkts++;
+                if (bufferPkts == bufferSize) {
+                    flushBuffer();
+                }
+            }
+        } else {
+            int res = DpdkAO.sendPacket(length, frame);
+            if (res < 0) {
+                throw new IOException("Unable to send packet. DPDK error.");
+            }
         }
-
-        return res;
+        return length;
     }
 
     @Override
@@ -124,4 +170,4 @@
             }
         }
     }
-}
\ No newline at end of file
+}