Mercurial > stress-tester
changeset 1119:9bae1220fef2
DpdkUnidirectionalRawPacketWorker - batch mode
author | Devel 1 |
---|---|
date | Tue, 02 Jun 2020 11:46:12 +0200 |
parents | a9d9c6e9bf19 |
children | 7f40755547ad |
files | stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java |
diffstat | 1 files changed, 54 insertions(+), 8 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java Tue Jun 02 11:44:26 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java Tue Jun 02 11:46:12 2020 +0200 @@ -1,5 +1,8 @@ package com.passus.st.emitter.raw; +import com.passus.commons.time.DefaultScheduledTimerService; +import com.passus.commons.time.ScheduledTimerCallback; +import com.passus.commons.time.ScheduledTimerService; import com.passus.dpdk.DpdkAO; import com.passus.net.MACAddress; import com.passus.net.SocketAddress; @@ -24,8 +27,12 @@ private MACAddress localMac = new MACAddress(ZERO_MAC); - protected int bufferSize = DEFAULT_BUFFER_SIZE; - protected long bufferFlushPeriod = DEFAULT_FLUSH_PERIOD; + private int bufferSize = DEFAULT_BUFFER_SIZE; + private long bufferFlushPeriod = DEFAULT_FLUSH_PERIOD; + private ScheduledTimerService scheduler; + private byte[][] bufferBytes; + private int[] bufferLengths; + private int bufferPkts; public void setBufferSize(int bufferSize) { this.bufferSize = bufferSize; @@ -104,13 +111,52 @@ } @Override + protected void onInit() { + synchronized (this) { + if (bufferSize > 0) { + bufferBytes = new byte[bufferSize][]; + bufferLengths = new int[bufferSize]; + bufferPkts = 0; + } + } + scheduler = new DefaultScheduledTimerService(getClass().getSimpleName() + "-Scheduler", bufferFlushPeriod); + scheduler.setCallback(this::flushBuffer); + scheduler.start(); + } + + @Override + protected void onFinish() { + if (scheduler != null) { + scheduler.stop(); + scheduler = null; + } + } + + private void flushBuffer() { + synchronized (this) { + DpdkAO.sendPackets0(bufferBytes, bufferLengths, bufferPkts); + bufferPkts = 0; + } + } + + @Override protected int doWrite0(UnidirectionalRawPacketChannelContext<DpdkAO> channelContext, byte[] frame, int length) throws IOException { - int res = DpdkAO.sendPacket(length, frame); - if (res < 0) { - throw new IOException("Unable to send packet. DPDK error."); + if (bufferSize > 0) { + synchronized (this) { + bufferBytes[bufferPkts] = frame; + bufferLengths[bufferPkts] = length; + bufferPkts++; + if (bufferPkts == bufferSize) { + flushBuffer(); + } + } + } else { + int res = DpdkAO.sendPacket(length, frame); + if (res < 0) { + throw new IOException("Unable to send packet. DPDK error."); + } } - - return res; + return length; } @Override @@ -124,4 +170,4 @@ } } } -} \ No newline at end of file +}