Mercurial > stress-tester
changeset 1072:0cafeaef7e1f
UdpFrameTemplate, UnidirectionalRawPacketWorker - frame dynamic modification in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketEmitter.java Tue Apr 28 09:57:36 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketEmitter.java Tue Apr 28 13:51:31 2020 +0200 @@ -1,9 +1,11 @@ package com.passus.st.emitter.raw; import com.passus.commons.annotations.Plugin; +import com.passus.config.annotations.NodeDefinitionCreate; import com.passus.dpdk.DpdkAO; import com.passus.st.plugin.PluginConstants; +@NodeDefinitionCreate(UnidirectionalRawPacketEmitter.UnidirectionalRawPacketEmitterNodeDefCreator.class) @Plugin(name = DpdkUnidirectionalRawPacketEmitter.TYPE, category = PluginConstants.CATEGORY_EMITTER) public class DpdkUnidirectionalRawPacketEmitter extends UnidirectionalRawPacketEmitter<DpdkAO> {
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java Tue Apr 28 09:57:36 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java Tue Apr 28 13:51:31 2020 +0200 @@ -4,7 +4,6 @@ import java.io.IOException; import java.net.NetworkInterface; -import java.util.Queue; public class DpdkUnidirectionalRawPacketWorker extends UnidirectionalRawPacketWorker<DpdkAO> { @@ -37,20 +36,13 @@ } @Override - protected int doWrite0(UnidirectionalRawPacketChannelContext<DpdkAO> channelContext) throws IOException { - Queue<byte[]> queue = channelContext.getDataQueue(); - int written = 0; - while (!queue.isEmpty()) { - byte[] buf = queue.poll(); - int res = DpdkAO.sendPacket(buf.length, buf); - if (res >= 0) { - written += res; - } else { - throw new IOException("Unable to send packet. DPDK error."); - } + protected int doWrite0(UnidirectionalRawPacketChannelContext<DpdkAO> channelContext, byte[] frame, int length) throws IOException { + int res = DpdkAO.sendPacket(length, frame); + if (res < 0) { + throw new IOException("Unable to send packet. DPDK error."); } - return written; + return res; } @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketEmitter.java Tue Apr 28 09:57:36 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketEmitter.java Tue Apr 28 13:51:31 2020 +0200 @@ -1,8 +1,10 @@ package com.passus.st.emitter.raw; import com.passus.commons.annotations.Plugin; +import com.passus.config.annotations.NodeDefinitionCreate; import com.passus.st.plugin.PluginConstants; +@NodeDefinitionCreate(UnidirectionalRawPacketEmitter.UnidirectionalRawPacketEmitterNodeDefCreator.class) @Plugin(name = PcapUnidirectionalRawPacketEmitter.TYPE, category = PluginConstants.CATEGORY_EMITTER) public class PcapUnidirectionalRawPacketEmitter extends UnidirectionalRawPacketEmitter<PcapOutput> {
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketWorker.java Tue Apr 28 09:57:36 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketWorker.java Tue Apr 28 13:51:31 2020 +0200 @@ -10,7 +10,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import java.util.Queue; class PcapUnidirectionalRawPacketWorker extends UnidirectionalRawPacketWorker<PcapOutput> { @@ -57,21 +56,14 @@ } @Override - protected int doWrite0(UnidirectionalRawPacketChannelContext<PcapOutput> channelContext) throws IOException { + protected int doWrite0(UnidirectionalRawPacketChannelContext<PcapOutput> channelContext, byte[] frame, int length) throws IOException { PcapOutput pcap = channelContext.getEngine(); - Queue<byte[]> queue = channelContext.getDataQueue(); - int written = 0; - while (!queue.isEmpty()) { - byte[] buf = queue.poll(); - int res = pcap.sendPacket(buf.length, buf); - if (res == 0) { - written += buf.length; - } else { - throw new IOException("Unable to send packet. Pcap error: " + pcap.pcap().getErr()); - } + int res = pcap.sendPacket(length, frame); + if (res != 0) { + throw new IOException("Unable to send packet. Pcap error: " + pcap.pcap().getErr()); } - return written; + return res; } @Override @@ -87,7 +79,6 @@ } } - private static class PcapInstance { private final PcapOutput pcap;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UdpFrameTemplate.java Tue Apr 28 13:51:31 2020 +0200 @@ -0,0 +1,53 @@ +package com.passus.st.emitter.raw; + +import com.passus.data.DataUtils; + +import static com.passus.net.FrameUtils.*; + +public class UdpFrameTemplate { + + final byte[] frame; + + final int ipv4Offset; + + final int udpOffset; + + final int headerLength; + + final int payloadOffset; + + final int maxPayloadLength; + + int payloadLength; + + int length; + + public UdpFrameTemplate(byte[] frame, + int ipv4Offset, + int udpOffset, + int payloadOffset) { + this.frame = frame; + this.ipv4Offset = ipv4Offset; + this.udpOffset = udpOffset; + this.payloadOffset = payloadOffset; + this.maxPayloadLength = frame.length - payloadOffset; + headerLength = ETH_HEADER_LENGTH + IPV4_HEADER_LENGTH + UDP_HEADER_LENGTH; + } + + public void updatePayload(byte[] payload) { + updatePayload(payload, payload.length); + } + + public void updatePayload(byte[] payload, int length) { + payloadLength = length; + DataUtils.writeInt2(frame, ipv4Offset + 2, length + IPV4_HEADER_LENGTH + UDP_HEADER_LENGTH); + DataUtils.writeInt2(frame, udpOffset + 4, length + UDP_HEADER_LENGTH); + + int ipv4CheckSum = calcIp4Checksum(frame); + DataUtils.writeInt2(frame, ipv4Offset + 10, ipv4CheckSum); + System.arraycopy(payload, 0, frame, payloadOffset, payloadLength); + + length = payloadLength + headerLength; + } + +}
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketChannelContext.java Tue Apr 28 09:57:36 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketChannelContext.java Tue Apr 28 13:51:31 2020 +0200 @@ -22,7 +22,7 @@ final Queue<byte[]> dataQueue; - final byte[] buffer; + UdpFrameTemplate frameTemplate; private FlowContext flowContext; @@ -52,7 +52,6 @@ this.remoteAddress = remoteAddress; this.localHardwareAddress = localHardwareAddress; this.remoteHardwareAddress = remoteHardwareAddress; - this.buffer = new byte[DEFAULT_BUFFER_SIZE]; } public E getEngine() {
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java Tue Apr 28 09:57:36 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java Tue Apr 28 13:51:31 2020 +0200 @@ -1,21 +1,32 @@ package com.passus.st.emitter.raw; import com.passus.commons.Assert; +import com.passus.config.Configuration; +import com.passus.config.ConfigurationContext; +import com.passus.config.schema.MapNodeDefinition; +import com.passus.config.validation.LongValidator; import com.passus.net.session.Session; import com.passus.st.emitter.*; import com.passus.st.metric.MetricsContainer; import java.io.IOException; +import static com.passus.config.schema.ConfigurationSchemaBuilder.*; +import static com.passus.config.validation.ValidatonUtils.OP_GREATER; +import static com.passus.net.FrameUtils.*; + public abstract class UnidirectionalRawPacketEmitter<E> implements Emitter { - private static final int DEFAULT_WORKERS_NUM = 4; + private static final int DEFAULT_THREADS = 4; + private static final int DEFAULT_MTU = 1500; + private static final boolean DEFAULT_COLLECT_METRICS = true; + private static final int MIN_MTU = ETH_HEADER_LENGTH + IPV4_HEADER_LENGTH + UDP_HEADER_LENGTH; private boolean started = false; private UnidirectionalRawPacketWorker<E>[] workers; - private int workersNum = DEFAULT_WORKERS_NUM; + private int threads = DEFAULT_THREADS; private PortPool portPool = new SynchronizedPortPoolWrapper(new PortPoolImpl()); @@ -25,6 +36,8 @@ private boolean collectMetrics = false; + private int mtu = DEFAULT_MTU; + @Override public void setSessionMapper(SessionMapper sessionMapper) { this.sessionMapper = sessionMapper; @@ -35,13 +48,25 @@ return sessionMapper; } - public int getWorkersNum() { - return workersNum; + public int getThreads() { + return threads; } - public void setWorkersNum(int workersNum) { - Assert.greaterThanZero(workersNum, "workersNum"); - this.workersNum = workersNum; + public void setThreads(int threads) { + Assert.greaterThanZero(threads, "threads"); + this.threads = threads; + } + + public int getMtu() { + return mtu; + } + + public void setMtu(int mtu) { + if (mtu < MIN_MTU) { + throw new IllegalArgumentException("MTU should be greater than " + MIN_MTU + "."); + } + + this.mtu = mtu; } public MACAddressResolver getMacResolver() { @@ -49,6 +74,7 @@ } public void setMacResolver(MACAddressResolver macResolver) { + Assert.notNull(macResolver, "macResolver"); this.macResolver = macResolver; } @@ -73,7 +99,16 @@ @Override public void writeMetrics(MetricsContainer container) { - throw new RuntimeException("Not implemented yet."); + + } + + @Override + public void configure(Configuration config, ConfigurationContext context) { + Emitter.super.configure(config, context); + setMacResolver((MACAddressResolver) config.get("macResolver")); + setThreads(config.getInteger("threads", DEFAULT_THREADS)); + setMtu(config.getInteger("mtu", DEFAULT_MTU)); + setCollectMetrics(config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS)); } @Override @@ -91,12 +126,13 @@ return; } - workers = createWorkersArray(workersNum); - for (int i = 0; i < workersNum; i++) { + workers = createWorkersArray(threads); + for (int i = 0; i < threads; i++) { UnidirectionalRawPacketWorker<E> worker = createWorker(); worker.setPortPool(portPool); worker.setMacResolver(macResolver); worker.setSessionMapper(sessionMapper); + worker.setMtu(mtu); worker.start(); workers[i] = worker; } @@ -110,7 +146,7 @@ return; } - for (int i = 0; i < workersNum; i++) { + for (int i = 0; i < threads; i++) { UnidirectionalRawPacketWorker<E> worker = workers[i]; worker.working = false; worker.interrupt(); @@ -129,7 +165,7 @@ @Override public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException { if (sessionInfo.getTransport() != Session.PROTOCOL_UDP) { - throw new IllegalArgumentException("Only UDP transport supported."); + throw new IOException("Only UDP transport supported."); } int hashCode = (sessionInfo.hashCode() + workerIndex) & 0x7fffffff; @@ -137,4 +173,20 @@ workers[index].connect(sessionInfo, handler); } + public static class UnidirectionalRawPacketEmitterNodeDefCreator extends EmitterNodeDefCreator { + + @Override + public MapNodeDefinition create() { + MapNodeDefinition def = super.create(); + def.add( + tupleDef("threads", valueDefInteger().addValidator(LongValidator.GREATER_ZERO)).setRequired(false), + tupleDef("mtu", valueDefInteger() + .addValidator(new LongValidator((long) MIN_MTU, OP_GREATER)) + ).setRequired(false), + tupleDef("macResolver", MACAddressResolverNodeDefinitionCreator.createDef()), + tupleDef("collectMetrics", valueDefBool()).setRequired(false) + ); + return def; + } + } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java Tue Apr 28 09:57:36 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java Tue Apr 28 13:51:31 2020 +0200 @@ -1,9 +1,13 @@ package com.passus.st.emitter.raw; import com.passus.commons.Assert; +import com.passus.data.DataUtils; +import com.passus.net.FrameUtils; import com.passus.net.IpAddress; import com.passus.net.MACAddress; import com.passus.net.SocketAddress; +import com.passus.net.packet.Ethernet; +import com.passus.net.packet.Ip; import com.passus.st.emitter.*; import com.passus.st.metric.MetricSource; import com.passus.st.metric.MetricsContainer; @@ -19,15 +23,21 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import static com.passus.net.FrameUtils.ETH_HEADER_LENGTH; +import static com.passus.net.FrameUtils.IPV4_HEADER_LENGTH; import static com.passus.net.utils.AddressUtils.jdkInetAddresToIpAddress; import static com.passus.st.emitter.EmitterUtils.getConnectionParams; import static com.passus.st.emitter.SessionMapper.ANY_SOCKET; public abstract class UnidirectionalRawPacketWorker<E> extends Thread implements MetricSource { + protected final Logger LOGGER = LogManager.getLogger(getClass()); + protected static final byte[] ZERO_MAC = {0, 0, 0, 0, 0, 0}; - protected final Logger LOGGER = LogManager.getLogger(UnidirectionalRawPacketWorker.class); + protected static final int IP_OFFSET = ETH_HEADER_LENGTH; + + protected static final int UDP_OFFSET = IP_OFFSET + IPV4_HEADER_LENGTH; boolean working = true; @@ -45,6 +55,8 @@ private MACAddressResolver macResolver; + private int mtu; + public PortPool getPortPool() { return portPool; } @@ -72,6 +84,16 @@ this.sessionMapper = sessionMapper; } + public int getMtu() { + return mtu; + } + + public void setMtu(int mtu) { + Assert.greaterThanZero(mtu, "mtu"); + this.mtu = mtu; + } + + @Override public boolean isCollectMetrics() { return collectMetrics; @@ -151,6 +173,28 @@ protected abstract E doInitEngine(UnidirectionalRawPacketChannelContext<E> channelContext, String device) throws IOException; + private void prepareFrameTemplate(UnidirectionalRawPacketChannelContext channelContext) { + SocketAddress srcAddress = channelContext.getLocalAddress(); + byte[] srcMac = channelContext.getLocalHardwareAddress().getAddress(); + int[] srcpIp = srcAddress.getIp().getAddress(); + byte[] srcpIpBytes = DataUtils.intsToBytesBE(srcpIp); + int srcPort = srcAddress.getPort(); + + SocketAddress dstAddress = channelContext.getRemoteAddress(); + byte[] dstMac = channelContext.getRemoteHardwareAddress().getAddress(); + int[] dstIp = dstAddress.getIp().getAddress(); + byte[] dstIpBytes = DataUtils.intsToBytesBE(dstIp); + int dstPort = dstAddress.getPort(); + + byte[] frame = new byte[mtu]; + int offset = 0; + offset += FrameUtils.writeEth(srcMac, dstMac, Ethernet.TYPE_IPV4, frame, offset); + offset += FrameUtils.writeIpv4(srcpIpBytes, dstIpBytes, 0, (byte) Ip.PROTO_UDP, frame, offset); + offset += FrameUtils.writeUdp(srcPort, dstPort, 0, frame, offset); + + channelContext.frameTemplate = new UdpFrameTemplate(frame, IP_OFFSET, UDP_OFFSET, offset); + } + private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) { SessionMapper.ConnectionParams connParams = getConnectionParams(sessionInfo, handler, sessionMapper, collectMetrics, metric, LOGGER); if (connParams == null) { @@ -196,8 +240,8 @@ MACAddress remoteMac = macResolver.resolve(remoteAddress.getIp()); channelContext = new UnidirectionalRawPacketChannelContext(this, handler, sessionInfo, localAddress, remoteAddress, localMac, remoteMac); + prepareFrameTemplate(channelContext); sessions.put(sessionInfo, channelContext); - try { handler.channelRegistered(channelContext); } catch (Exception ex) { @@ -214,8 +258,8 @@ } } - protected abstract int doWrite0(UnidirectionalRawPacketChannelContext<E> channelContext) throws IOException; - + protected abstract int doWrite0(UnidirectionalRawPacketChannelContext<E> channelContext, byte[] frame, int length) throws IOException; + private void doWrite(SessionInfo sessionInfo) { UnidirectionalRawPacketChannelContext<E> channelContext = sessions.get(sessionInfo); if (channelContext == null) { @@ -236,7 +280,16 @@ int written = 0; try { - written = doWrite0(channelContext); + while (!queue.isEmpty()) { + byte[] buf = queue.poll(); + if (channelContext.frameTemplate.maxPayloadLength >= buf.length) { + UdpFrameTemplate ft = channelContext.frameTemplate; + ft.updatePayload(buf); + written += doWrite0(channelContext, ft.frame, ft.length); + } else if (LOGGER.isDebugEnabled()) { + LOGGER.debug("payload.length {} > maxPayloadLength {}", buf.length, channelContext.frameTemplate.maxPayloadLength); + } + } } catch (Exception e) { doCatchException(channelContext, e); doClose(channelContext);
--- a/stress-tester/src/test/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketEmitterTest.java Tue Apr 28 09:57:36 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/emitter/raw/PcapUnidirectionalRawPacketEmitterTest.java Tue Apr 28 13:51:31 2020 +0200 @@ -9,7 +9,6 @@ import com.passus.st.emitter.MapBasedMACAddressResolver; import com.passus.st.emitter.SessionInfo; import com.passus.st.emitter.SessionMapper; -import com.passus.st.emitter.raw.PcapUnidirectionalRawPacketEmitter; import org.testng.AssertJUnit; import org.testng.annotations.Test; @@ -21,7 +20,7 @@ macResolver.add("* -> 00:00:00:00:00:00"); PcapUnidirectionalRawPacketEmitter emitter = new PcapUnidirectionalRawPacketEmitter(); - emitter.setWorkersNum(1); + emitter.setThreads(1); emitter.setMacResolver(macResolver); if (mapper != null) { emitter.setSessionMapper(mapper);