Mercurial > stress-tester
changeset 1087:2600ca3bdfd6
DpdkUnidirectionalRawPacketWorker.onConnect overrided
author | Devel 2 |
---|---|
date | Thu, 07 May 2020 13:56:28 +0200 |
parents | 969427aad169 |
children | dd193de63d88 |
files | stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java |
diffstat | 2 files changed, 61 insertions(+), 8 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java Thu May 07 13:27:32 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/DpdkUnidirectionalRawPacketWorker.java Thu May 07 13:56:28 2020 +0200 @@ -1,16 +1,25 @@ package com.passus.st.emitter.raw; import com.passus.dpdk.DpdkAO; +import com.passus.net.MACAddress; +import com.passus.net.SocketAddress; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.emitter.SessionMapper; import java.io.IOException; import java.net.NetworkInterface; +import static com.passus.st.emitter.EmitterUtils.getConnectionParams; + public class DpdkUnidirectionalRawPacketWorker extends UnidirectionalRawPacketWorker<DpdkAO> { private DpdkAO dpdkAO; private int dpdkInitCalls = 0; + private MACAddress localMac = new MACAddress(ZERO_MAC); + @Override protected String resolveDevice(NetworkInterface networkInterface) throws IOException { return null; @@ -36,6 +45,50 @@ } @Override + protected void doConnect(SessionInfo sessionInfo, EmitterHandler handler) { + SessionMapper.ConnectionParams connParams = getConnectionParams(sessionInfo, handler, sessionMapper, collectMetrics, metric, LOGGER); + if (connParams == null) { + return; + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); + } + + SocketAddress localAddress = connParams.getBindAddress(); + SocketAddress remoteAddress = connParams.getRemoteAddress(); + if (remoteAddress == null) { + remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); + } + + MACAddress remoteMac = macResolver.resolve(remoteAddress.getIp()); + UnidirectionalRawPacketChannelContext<DpdkAO> channelContext = new UnidirectionalRawPacketChannelContext(this, handler, sessionInfo, localAddress, remoteAddress, localMac, remoteMac); + prepareFrameTemplate(channelContext); + sessions.put(sessionInfo, channelContext); + try { + handler.channelRegistered(channelContext); + } catch (Exception ex) { + doCatchException(channelContext, ex); + } + + try { + DpdkAO engine = doInitEngine(channelContext, null); + channelContext.setEngine(engine); + channelContext.setDevice("DPDK"); + handler.channelActive(channelContext); + } catch (Exception ex) { + doCatchException(channelContext, ex); + } + + if (collectMetrics) { + synchronized (metric) { + metric.addRemoteSocketConnection(remoteAddress); + metric.addBindSocket(localAddress); + } + } + } + + @Override protected int doWrite0(UnidirectionalRawPacketChannelContext<DpdkAO> channelContext, byte[] frame, int length) throws IOException { int res = DpdkAO.sendPacket(length, frame); if (res < 0) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java Thu May 07 13:27:32 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java Thu May 07 13:56:28 2020 +0200 @@ -45,17 +45,17 @@ protected final BlockingQueue<UnidirectionalTasks.UnidirectionalTask> tasks = new LinkedBlockingQueue<>(); - private Map<SessionInfo, UnidirectionalRawPacketChannelContext<E>> sessions = new HashMap<>(); + protected Map<SessionInfo, UnidirectionalRawPacketChannelContext<E>> sessions = new HashMap<>(); - private SessionMapper sessionMapper; + protected SessionMapper sessionMapper; - private volatile boolean collectMetrics = false; + protected volatile boolean collectMetrics = false; - private EmitterMetric metric; + protected EmitterMetric metric; private PortPool portPool = new PortPoolImpl(); - private MACAddressResolver macResolver; + protected MACAddressResolver macResolver; private int mtu = DEFAULT_MTU; @@ -126,7 +126,7 @@ } } - private void doCatchException(UnidirectionalRawPacketChannelContext<E> channelContext, Throwable cause) { + protected void doCatchException(UnidirectionalRawPacketChannelContext<E> channelContext, Throwable cause) { EmitterHandler handler = channelContext == null ? null : channelContext.getHandler(); EmitterUtils.doCatchException(channelContext, cause, handler, collectMetrics, metric, LOGGER); } @@ -187,7 +187,7 @@ protected abstract E doInitEngine(UnidirectionalRawPacketChannelContext<E> channelContext, String device) throws IOException; - private void prepareFrameTemplate(UnidirectionalRawPacketChannelContext channelContext) { + protected void prepareFrameTemplate(UnidirectionalRawPacketChannelContext channelContext) { SocketAddress srcAddress = channelContext.getLocalAddress(); byte[] srcMac = channelContext.getLocalHardwareAddress().getAddress(); int[] srcpIp = srcAddress.getIp().getAddress(); @@ -209,7 +209,7 @@ channelContext.frameTemplate = new UdpFrameTemplate(frame, IP_OFFSET, UDP_OFFSET, offset); } - private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) { + protected void doConnect(SessionInfo sessionInfo, EmitterHandler handler) { SessionMapper.ConnectionParams connParams = getConnectionParams(sessionInfo, handler, sessionMapper, collectMetrics, metric, LOGGER); if (connParams == null) { return;