Mercurial > stress-tester
changeset 971:b429501707ca
PcapSessionEventSource - DNS broadcast in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/Protocols.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/Protocols.java Mon Jul 22 15:46:58 2019 +0200 @@ -5,6 +5,7 @@ public static final int UNKNOWN = 0; public static final int HTTP = 1; public static final int DNS = 2; + public static final int NETFLOW = 3; private Protocols() { } @@ -15,6 +16,8 @@ return "HTTP"; case DNS: return "DNS"; + case NETFLOW: + return "NETFLOW"; default: return "unknown"; }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java Mon Jul 22 15:46:58 2019 +0200 @@ -1,7 +1,9 @@ package com.passus.st.client; +import com.passus.st.client.dns.DnsFlowHandler; import com.passus.st.client.http.HttpFlowHandler; +import static com.passus.st.Protocols.DNS; import static com.passus.st.Protocols.HTTP; public class FlowHandlerFactoryImpl implements FlowHandlerFactory { @@ -11,6 +13,9 @@ switch (protocolId) { case HTTP: return new HttpFlowHandler(); + case DNS: + return new DnsFlowHandler(); + } throw new IllegalArgumentException("Not supported protocol '" + protocolId + "'.");
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java Mon Jul 22 15:46:58 2019 +0200 @@ -0,0 +1,76 @@ +package com.passus.st.client.dns; + +import com.passus.commons.Assert; +import com.passus.commons.time.TimeAware; +import com.passus.commons.time.TimeGenerator; +import com.passus.st.client.FlowContext; +import com.passus.st.client.FlowHandler; +import com.passus.st.client.FlowHandlerDataDecoder; +import com.passus.st.client.FlowHandlerDataEncoder; +import com.passus.st.metric.MetricsContainer; + +import static com.passus.st.Protocols.DNS; + +public class DnsFlowHandler implements FlowHandler, TimeAware { + + private final DnsFlowHandlerDataDecoder decoder; + + private final DnsFlowHandlerDataEncoder encoder; + + TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); + + boolean collectMetrics = false; + + public DnsFlowHandler() { + this.decoder = new DnsFlowHandlerDataDecoder(this); + this.encoder = new DnsFlowHandlerDataEncoder(); + } + + @Override + public int getProtocolId() { + return DNS; + } + + @Override + public boolean isCollectMetrics() { + return collectMetrics; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + this.collectMetrics = collectMetrics; + } + + @Override + public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) { + return decoder; + } + + @Override + public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) { + return encoder; + } + + @Override + public void writeMetrics(MetricsContainer container) { + + } + + @Override + public TimeGenerator getTimeGenerator() { + return timeGenerator; + } + + @Override + public void setTimeGenerator(TimeGenerator timeGenerator) { + Assert.notNull(timeGenerator, "timeGenerator"); + this.timeGenerator = timeGenerator; + } + + @Override + public void init(FlowContext flowContext) { + + } + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandlerDataDecoder.java Mon Jul 22 15:46:58 2019 +0200 @@ -0,0 +1,73 @@ +package com.passus.st.client.dns; + +import com.passus.data.ByteBuff; +import com.passus.data.DataDecoder; +import com.passus.net.dns.Dns; +import com.passus.net.dns.DnsDecoder; +import com.passus.st.client.FlowContext; +import com.passus.st.client.FlowHandlerDataDecoder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import static com.passus.st.client.FlowUtils.debug; + +public class DnsFlowHandlerDataDecoder implements FlowHandlerDataDecoder<Dns> { + + private static final Logger LOGGER = LogManager.getLogger(DnsFlowHandlerDataDecoder.class); + + private DnsDecoder decoder = new DnsDecoder(); + + private final DnsFlowHandler handler; + + public DnsFlowHandlerDataDecoder(DnsFlowHandler handler) { + this.handler = handler; + } + + @Override + public Dns getResult() { + return decoder.getResult(); + } + + @Override + public int state() { + return decoder.state(); + } + + @Override + public String getLastError() { + return decoder.getLastError(); + } + + @Override + public int decode(ByteBuff buffer, FlowContext flowContext) { + int res = decoder.decode(buffer); + if (decoder.state() == DataDecoder.STATE_FINISHED) { + long now = handler.timeGenerator.currentTimeMillis(); + + if (LOGGER.isDebugEnabled()) { + debug(LOGGER, flowContext, + "Response decoded (size: {} B, downloaded: {} ms)", + res, + now - flowContext.receivedStartTimestamp() + ); + } + + //TODO Dorobic metryki + /*if (client.isCollectMetrics()) { + synchronized (client.metric) { + client.metric.incResponsesNum(); + client.metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize()); + client.metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp()); + if (lastRequest != null) { + client.metric.addResponseTime(now - (long) lastRequest.getTag(TAG_TIME_START)); + } + } + }*/ + + } + + return res; + } + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandlerDataEncoder.java Mon Jul 22 15:46:58 2019 +0200 @@ -0,0 +1,17 @@ +package com.passus.st.client.dns; + +import com.passus.data.ByteBuff; +import com.passus.net.dns.Dns; +import com.passus.net.dns.DnsEncoder; +import com.passus.st.client.FlowContext; +import com.passus.st.client.FlowHandlerDataEncoder; + +public class DnsFlowHandlerDataEncoder implements FlowHandlerDataEncoder<Dns> { + + private DnsEncoder encoder = new DnsEncoder(); + + @Override + public void encode(Dns dns, FlowContext flowContext, ByteBuff out) { + encoder.encode(dns, out); + } +}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java Mon Jul 22 15:46:58 2019 +0200 @@ -1,5 +1,6 @@ package com.passus.st.client.http; +import com.passus.commons.Assert; import com.passus.commons.time.TimeAware; import com.passus.commons.time.TimeGenerator; import com.passus.net.http.HttpRequest; @@ -61,6 +62,7 @@ @Override public void setTimeGenerator(TimeGenerator timeGenerator) { + Assert.notNull(timeGenerator, "timeGenerator"); this.timeGenerator = timeGenerator; }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandlerDataDecoder.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandlerDataDecoder.java Mon Jul 22 15:46:58 2019 +0200 @@ -20,12 +20,12 @@ private final HttpFullMessageDecoder decoder; - private final HttpFlowHandler client; + private final HttpFlowHandler handler; private HttpRequest lastRequest; - public HttpFlowHandlerDataDecoder(HttpFlowHandler client) { - this.client = client; + public HttpFlowHandlerDataDecoder(HttpFlowHandler handler) { + this.handler = handler; decoder = new HttpFullMessageDecoder(); decoder.setDecodeRequest(false); } @@ -67,7 +67,7 @@ int res = decoder.decode(buffer); if (decoder.state() == DataDecoder.STATE_FINISHED) { - long now = client.timeGenerator.currentTimeMillis(); + long now = handler.timeGenerator.currentTimeMillis(); HttpResponse resp = (HttpResponse) decoder.getResult(); if (LOGGER.isDebugEnabled()) { @@ -79,14 +79,14 @@ ); } - if (client.isCollectMetrics()) { - synchronized (client.metric) { - client.metric.incResponsesNum(); - client.metric.addResponseStatusCode(resp.getStatus().getCode()); - client.metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize()); - client.metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp()); + if (handler.isCollectMetrics()) { + synchronized (handler.metric) { + handler.metric.incResponsesNum(); + handler.metric.addResponseStatusCode(resp.getStatus().getCode()); + handler.metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize()); + handler.metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp()); if (lastRequest != null) { - client.metric.addResponseTime(now - (long) lastRequest.getTag(TAG_TIME_START)); + handler.metric.addResponseTime(now - (long) lastRequest.getTag(TAG_TIME_START)); } } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java Mon Jul 22 15:46:58 2019 +0200 @@ -58,7 +58,7 @@ private final Class<? extends NioEmitterWorker> workerClass; public NioEmitter() { - this(NioSocketEmitterWorker.class); + this(NioEmitterWorkerImpl.class); } public NioEmitter(Class<? extends NioEmitterWorker> workerClass) {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java Mon Jul 22 15:46:58 2019 +0200 @@ -0,0 +1,130 @@ +package com.passus.st.emitter.nio; + +import com.passus.net.SocketAddress; +import com.passus.net.session.Session; +import com.passus.net.utils.AddressUtils; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.emitter.SessionMapper; + +import java.io.IOException; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +import static com.passus.st.emitter.SessionMapper.ANY_SOCKET; + +public class NioEmitterWorkerImpl extends NioAbstractEmitterWorker { + + public NioEmitterWorkerImpl(int index) throws IOException { + super(index); + } + + @Override + protected void doConnect(SessionInfo sessionInfo, EmitterHandler handler) { + if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) { + doConnectSocket(sessionInfo, handler); + } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) { + doConnectDatagram(sessionInfo, handler); + } + } + + protected void doConnectSocket(SessionInfo sessionInfo, EmitterHandler handler) { + try { + SessionMapper.ConnectionParams connParams = getConnParams(sessionInfo, handler); + if (connParams == null) { + return; + } + + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); + + SocketAddress bindAddress = connParams.getBindAddress(); + if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) { + channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress)); + } + + SocketAddress remoteAddress = connParams.getRemoteAddress(); + if (remoteAddress == null) { + remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); + } + + NioSocketChannelContext channelContext = new NioSocketChannelContext(this, channel, remoteAddress, sessionInfo); + KeyContext keyContext = new KeyContext(channelContext, handler); + SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext); + try { + handler.channelRegistered(channelContext); + } catch (Exception ex) { + doCatchException(key, ex); + } + + channelContext.selectionKey(key); + try { + channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress)); + } catch (Exception ex) { + doCatchException(key, ex); + return; + } + + selector.wakeup(); + } catch (Exception e) { + if (collectMetrics) { + metric.errorCaught(e); + } + logger.error(e.getMessage(), e); + } + + } + + protected void doConnectDatagram(SessionInfo sessionInfo, EmitterHandler handler) { + try { + SessionMapper.ConnectionParams connParams = getConnParams(sessionInfo, handler); + if (connParams == null) { + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); + } + + DatagramChannel channel = DatagramChannel.open(); + channel.configureBlocking(false); + + SocketAddress bindAddress = connParams.getBindAddress(); + if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) { + channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress)); + } + + SocketAddress remoteAddress = connParams.getRemoteAddress(); + if (remoteAddress == null) { + remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); + } + + NioDatagramChannelContext channelContext = new NioDatagramChannelContext(this, channel, remoteAddress, sessionInfo); + KeyContext keyContext = new KeyContext(channelContext, handler); + SelectionKey key = channel.register(selector, 0, keyContext); + try { + handler.channelRegistered(channelContext); + } catch (Exception ex) { + doCatchException(key, ex); + } + + channelContext.selectionKey(key); + try { + channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress)); + keyContext.handler.channelActive(keyContext.channelContext); + } catch (Exception ex) { + doCatchException(key, ex); + return; + } + + selector.wakeup(); + } catch (Exception e) { + if (collectMetrics) { + metric.errorCaught(e); + } + logger.error(e.getMessage(), e); + } + + } +}
--- a/stress-tester/src/main/java/com/passus/st/source/BaseSessionAnalyzerListener.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/BaseSessionAnalyzerListener.java Mon Jul 22 15:46:58 2019 +0200 @@ -22,22 +22,26 @@ protected final PcapSessionEventSourceMetric metric; + protected final int protocolId; + public BaseSessionAnalyzerListener(String sourceName, EventHandler eventHandler, - boolean collectMetric, PcapSessionEventSourceMetric metric) { + boolean collectMetric, PcapSessionEventSourceMetric metric, + int protocolId) { this.sourceName = sourceName; this.eventHandler = eventHandler; this.collectMetric = collectMetric; this.metric = metric; + this.protocolId = protocolId; } protected void firePayloadEvent(Object req, Object resp, SessionContext context, long timestamp) { SessionInfo info = new SessionInfo( context.getSrcIpAddr(), context.getSrcPort(), context.getDstIpAddr(), context.getDstPort(), - HTTP, context.getProtocol(), context.getId()); + context.getProtocol(), protocolId, context.getId()); info.setSourceName(sourceName); - Event event = new SessionPayloadEvent(info, req, resp, HTTP, sourceName); + Event event = new SessionPayloadEvent(info, req, resp, protocolId, sourceName); event.setTimestamp(timestamp); eventHandler.handle(event); if (collectMetric) {
--- a/stress-tester/src/main/java/com/passus/st/source/PcapDnsListener.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapDnsListener.java Mon Jul 22 15:46:58 2019 +0200 @@ -8,13 +8,15 @@ import java.util.HashMap; import java.util.Map; +import static com.passus.st.Protocols.DNS; + public class PcapDnsListener extends BaseSessionAnalyzerListener<Dns> { private final Map<SessionKey, Dns> lastRequests; public PcapDnsListener(String sourceName, EventHandler eventHandler, boolean collectMetric, PcapSessionEventSourceMetric metric, int maxSessionNum) { - super(sourceName, eventHandler, collectMetric, metric); + super(sourceName, eventHandler, collectMetric, metric, DNS); lastRequests = new HashMap<>(maxSessionNum); }
--- a/stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java Mon Jul 22 15:46:58 2019 +0200 @@ -9,13 +9,15 @@ import java.util.HashMap; import java.util.Map; +import static com.passus.st.Protocols.HTTP; + public class PcapHttpListener extends BaseSessionAnalyzerListener<HttpMessage> { private final Map<SessionKey, HttpRequest> lastRequests; public PcapHttpListener(String sourceName, EventHandler eventHandler, boolean collectMetric, PcapSessionEventSourceMetric metric, int maxSessionNum) { - super(sourceName, eventHandler, collectMetric, metric); + super(sourceName, eventHandler, collectMetric, metric, HTTP); lastRequests = new HashMap<>(maxSessionNum); }
--- a/stress-tester/src/main/java/com/passus/st/source/PcapNetflowSessionAnalyzerHook.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapNetflowSessionAnalyzerHook.java Mon Jul 22 15:46:58 2019 +0200 @@ -3,6 +3,8 @@ import com.passus.net.netflow.NetflowUdpSessionAnalyzer; import com.passus.net.session.SessionAnalyzer; +import static com.passus.st.Protocols.DNS; + public class PcapNetflowSessionAnalyzerHook extends PcapSessionAnalyzerHook { @Override @@ -15,7 +17,7 @@ PcapUnidirectionalAnalyzerListener listener = new PcapUnidirectionalAnalyzerListener(context.getSourceName(), context.getEventHandler(), context.isCollectMetric(), - context.getMetric()); + context.getMetric(), DNS); analyzer.setListener(listener); context.getUdpProcessor().addAnalyzer(analyzer);
--- a/stress-tester/src/main/java/com/passus/st/source/PcapUnidirectionalAnalyzerListener.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapUnidirectionalAnalyzerListener.java Mon Jul 22 15:46:58 2019 +0200 @@ -6,8 +6,8 @@ public class PcapUnidirectionalAnalyzerListener<T> extends BaseSessionAnalyzerListener<T> { public PcapUnidirectionalAnalyzerListener(String sourceName, EventHandler eventHandler, - boolean collectMetric, PcapSessionEventSourceMetric metric) { - super(sourceName, eventHandler, collectMetric, metric); + boolean collectMetric, PcapSessionEventSourceMetric metric, int protocolId) { + super(sourceName, eventHandler, collectMetric, metric, protocolId); } @Override
--- a/stress-tester/src/test/java/com/passus/st/client/AsynchFlowWorkerTest.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/AsynchFlowWorkerTest.java Mon Jul 22 15:46:58 2019 +0200 @@ -8,7 +8,9 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import static com.github.tomakehurst.wiremock.client.WireMock.*; @@ -38,9 +40,9 @@ @Test(enabled = false) public void testHandle() throws Exception { - Properties props = new Properties(); - props.put("allowPartialSession", "true"); - props.put("ports", "4214"); + Map<String, Object> props = new HashMap<>(); + props.put("allowPartialSession", true); + props.put("ports", 4214); List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); assertEquals(4, events.size());
--- a/stress-tester/src/test/java/com/passus/st/client/FlowExecutorTest.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/FlowExecutorTest.java Mon Jul 22 15:46:58 2019 +0200 @@ -8,9 +8,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; +import java.util.*; import static com.github.tomakehurst.wiremock.client.WireMock.*; import static org.testng.AssertJUnit.assertEquals; @@ -40,9 +38,9 @@ @Test public void testHandle_HTTP() throws Exception { - Properties props = new Properties(); - props.put("allowPartialSession", "true"); - props.put("ports", "4214"); + Map<String, Object> props = new HashMap<>(); + props.put("allowPartialSession", true); + props.put("ports", 4214); List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); assertEquals(4, events.size()); @@ -75,9 +73,9 @@ @Test public void testHandle_HTTP_ConnectPartialSession() throws Exception { - Properties props = new Properties(); - props.put("allowPartialSession", "true"); - props.put("ports", "4214"); + Map<String, Object> props = new HashMap<>(); + props.put("allowPartialSession", true); + props.put("ports", 4214); List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); NioEmitter emitter = prepareEmitter("10.87.110.40:4214 -> " + HOST + ":" + port()); @@ -105,9 +103,9 @@ @Test public void testHandle_HTTP_ThreeLoops() throws Exception { - Properties props = new Properties(); - props.put("allowPartialSession", "true"); - props.put("ports", "4214"); + Map<String, Object> props = new HashMap<>(); + props.put("allowPartialSession", true); + props.put("ports", 4214); LinkedList<Event> events = new LinkedList<>(EventUtils.readEvents("pcap/http/http_req_resp.pcap", props)); assertEquals(4, events.size()); Event dataEnd = events.removeLast(); //Usuwamy DataEnd
--- a/stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java Mon Jul 22 15:46:58 2019 +0200 @@ -8,7 +8,9 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import static com.github.tomakehurst.wiremock.client.WireMock.*; @@ -38,9 +40,9 @@ @Test public void testHandle() throws Exception { - Properties props = new Properties(); - props.put("allowPartialSession", "true"); - props.put("ports", "4214"); + Map<String, Object> props = new HashMap<>(); + props.put("allowPartialSession", true); + props.put("ports", 4214); List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); assertEquals(4, events.size());
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Mon Jul 22 15:46:58 2019 +0200 @@ -16,10 +16,7 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import java.util.Queue; +import java.util.*; import static com.passus.st.utils.Assert.assertHttpClientEvents; import static org.testng.AssertJUnit.assertEquals; @@ -199,9 +196,9 @@ } private List<Event> readEvents(String pcapFile) throws Exception { - Properties props = new Properties(); - props.put("allowPartialSession", "true"); - props.put("ports", "4214"); + Map<String, Object> props = new HashMap<>(); + props.put("allowPartialSession", true); + props.put("ports", 4214); return EventUtils.readEvents(pcapFile, props); }
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java Mon Jul 22 15:46:58 2019 +0200 @@ -13,7 +13,9 @@ import com.passus.st.utils.EventUtils; import com.passus.st.utils.NcDataBlockReaderUtils; import java.io.File; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import static org.testng.AssertJUnit.assertEquals; import org.testng.annotations.Test; @@ -26,9 +28,9 @@ @Test public void testHandle() throws Exception { - Properties props = new Properties(); - props.put("allowPartialSession", "true"); - props.put("ports", "4214"); + Map<String, Object> props = new HashMap<>(); + props.put("allowPartialSession", true); + props.put("ports", 4214); List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); assertEquals(4, events.size());
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java Mon Jul 22 15:46:58 2019 +0200 @@ -7,7 +7,9 @@ import com.passus.st.utils.EventUtils; import java.io.File; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import static com.passus.st.utils.Assert.*; import org.testng.annotations.DataProvider; @@ -20,9 +22,9 @@ public class NcEventSourceTest { private FileEvents writeEvents(String pcapFile) throws IOException { - Properties props = new Properties(); - props.put("allowPartialSession", "true"); - props.put("ports", "4214,8080"); + Map<String, Object> props = new HashMap<>(); + props.put("allowPartialSession", true); + props.put("ports", 4214); List<Event> events = EventUtils.readEvents(pcapFile, props); File tmpFile = createTmpFile(false);
--- a/stress-tester/src/test/java/com/passus/st/utils/EventUtils.java Fri Jul 19 14:56:06 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/utils/EventUtils.java Mon Jul 22 15:46:58 2019 +0200 @@ -1,15 +1,19 @@ package com.passus.st.utils; +import com.passus.commons.utils.ArrayUtils; import com.passus.commons.utils.ResourceUtils; import com.passus.net.PortRangeSet; +import com.passus.net.dns.session.DnsUdpSessionAnalyzer; import com.passus.net.http.session.HttpSessionAnalyzer; +import com.passus.st.Protocols; import com.passus.st.client.ArrayListEventHandler; import com.passus.st.client.Event; import com.passus.st.source.PcapSessionEventSource; import java.io.File; import java.util.List; -import java.util.Properties; +import java.util.Map; +import java.util.Set; /** * @author Mirosław Hawrot @@ -23,16 +27,23 @@ return readEvents(pcapRelativeFile, null); } - public static List<Event> readEvents(String pcapRelativeFile, Properties props) { + public static List<Event> readEvents(String pcapRelativeFile, Map<String, Object> props) { try { boolean allowPartialSession = false; + Set<Integer> protocols = ArrayUtils.asSet(Protocols.HTTP); PortRangeSet portsRange = null; if (props != null) { - allowPartialSession = Boolean.parseBoolean(props.getProperty("allowPartialSession", "false")); + allowPartialSession = (boolean) props.getOrDefault("allowPartialSession", false); - if (props.containsKey("ports")) { - portsRange = PortRangeSet.parse(props.getProperty("ports")); + Object ports = props.getOrDefault("ports", null); + if (ports instanceof PortRangeSet) { + portsRange = (PortRangeSet) ports; + } else if (ports instanceof Integer) { + portsRange = new PortRangeSet(); + portsRange.add((Integer) ports); } + + protocols = (Set<Integer>) props.getOrDefault("protocols", ArrayUtils.asSet(Protocols.HTTP)); } File pcapFile = ResourceUtils.getFile(pcapRelativeFile); @@ -40,11 +51,21 @@ src.setAllowPartialSession(allowPartialSession); src.setPcapFile(pcapFile.getAbsolutePath()); - HttpSessionAnalyzer analyzer = new HttpSessionAnalyzer(); - if (portsRange != null) { - analyzer.getPortsRange().addAll(portsRange); + if (protocols.contains(Protocols.HTTP)) { + HttpSessionAnalyzer analyzer = new HttpSessionAnalyzer(); + if (portsRange != null) { + analyzer.getPortsRange().addAll(portsRange); + } + src.addAnalyzer(analyzer); } - src.addAnalyzer(analyzer); + + if (protocols.contains(Protocols.DNS)) { + DnsUdpSessionAnalyzer analyzer = new DnsUdpSessionAnalyzer(); + if (portsRange != null) { + analyzer.getPortsRange().addAll(portsRange); + } + src.addAnalyzer(analyzer); + } src.setLoops(1);