changeset 971:b429501707ca

PcapSessionEventSource - DNS broadcast in progress
author Devel 2
date Mon, 22 Jul 2019 15:46:58 +0200
parents b0e26fb79c3a
children 6fc989064ecf
files stress-tester/src/main/java/com/passus/st/Protocols.java stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandlerDataDecoder.java stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandlerDataEncoder.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandlerDataDecoder.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java stress-tester/src/main/java/com/passus/st/source/BaseSessionAnalyzerListener.java stress-tester/src/main/java/com/passus/st/source/PcapDnsListener.java stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java stress-tester/src/main/java/com/passus/st/source/PcapNetflowSessionAnalyzerHook.java stress-tester/src/main/java/com/passus/st/source/PcapUnidirectionalAnalyzerListener.java stress-tester/src/test/java/com/passus/st/client/AsynchFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/client/FlowExecutorTest.java stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java stress-tester/src/test/java/com/passus/st/utils/EventUtils.java
diffstat 21 files changed, 400 insertions(+), 60 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/Protocols.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/Protocols.java	Mon Jul 22 15:46:58 2019 +0200
@@ -5,6 +5,7 @@
     public static final int UNKNOWN = 0;
     public static final int HTTP = 1;
     public static final int DNS = 2;
+    public static final int NETFLOW = 3;
 
     private Protocols() {
     }
@@ -15,6 +16,8 @@
                 return "HTTP";
             case DNS:
                 return "DNS";
+            case NETFLOW:
+                return "NETFLOW";
             default:
                 return "unknown";
         }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java	Mon Jul 22 15:46:58 2019 +0200
@@ -1,7 +1,9 @@
 package com.passus.st.client;
 
+import com.passus.st.client.dns.DnsFlowHandler;
 import com.passus.st.client.http.HttpFlowHandler;
 
+import static com.passus.st.Protocols.DNS;
 import static com.passus.st.Protocols.HTTP;
 
 public class FlowHandlerFactoryImpl implements FlowHandlerFactory {
@@ -11,6 +13,9 @@
         switch (protocolId) {
             case HTTP:
                 return new HttpFlowHandler();
+            case DNS:
+                return new DnsFlowHandler();
+
         }
 
         throw new IllegalArgumentException("Not supported protocol '" + protocolId + "'.");
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java	Mon Jul 22 15:46:58 2019 +0200
@@ -0,0 +1,76 @@
+package com.passus.st.client.dns;
+
+import com.passus.commons.Assert;
+import com.passus.commons.time.TimeAware;
+import com.passus.commons.time.TimeGenerator;
+import com.passus.st.client.FlowContext;
+import com.passus.st.client.FlowHandler;
+import com.passus.st.client.FlowHandlerDataDecoder;
+import com.passus.st.client.FlowHandlerDataEncoder;
+import com.passus.st.metric.MetricsContainer;
+
+import static com.passus.st.Protocols.DNS;
+
+public class DnsFlowHandler implements FlowHandler, TimeAware {
+
+    private final DnsFlowHandlerDataDecoder decoder;
+
+    private final DnsFlowHandlerDataEncoder encoder;
+
+    TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
+
+    boolean collectMetrics = false;
+
+    public DnsFlowHandler() {
+        this.decoder = new DnsFlowHandlerDataDecoder(this);
+        this.encoder = new DnsFlowHandlerDataEncoder();
+    }
+
+    @Override
+    public int getProtocolId() {
+        return DNS;
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        this.collectMetrics = collectMetrics;
+    }
+
+    @Override
+    public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) {
+        return decoder;
+    }
+
+    @Override
+    public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) {
+        return encoder;
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+
+    }
+
+    @Override
+    public TimeGenerator getTimeGenerator() {
+        return timeGenerator;
+    }
+
+    @Override
+    public void setTimeGenerator(TimeGenerator timeGenerator) {
+        Assert.notNull(timeGenerator, "timeGenerator");
+        this.timeGenerator = timeGenerator;
+    }
+
+    @Override
+    public void init(FlowContext flowContext) {
+
+    }
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandlerDataDecoder.java	Mon Jul 22 15:46:58 2019 +0200
@@ -0,0 +1,73 @@
+package com.passus.st.client.dns;
+
+import com.passus.data.ByteBuff;
+import com.passus.data.DataDecoder;
+import com.passus.net.dns.Dns;
+import com.passus.net.dns.DnsDecoder;
+import com.passus.st.client.FlowContext;
+import com.passus.st.client.FlowHandlerDataDecoder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import static com.passus.st.client.FlowUtils.debug;
+
+public class DnsFlowHandlerDataDecoder implements FlowHandlerDataDecoder<Dns> {
+
+    private static final Logger LOGGER = LogManager.getLogger(DnsFlowHandlerDataDecoder.class);
+
+    private DnsDecoder decoder = new DnsDecoder();
+
+    private final DnsFlowHandler handler;
+
+    public DnsFlowHandlerDataDecoder(DnsFlowHandler handler) {
+        this.handler = handler;
+    }
+
+    @Override
+    public Dns getResult() {
+        return decoder.getResult();
+    }
+
+    @Override
+    public int state() {
+        return decoder.state();
+    }
+
+    @Override
+    public String getLastError() {
+        return decoder.getLastError();
+    }
+
+    @Override
+    public int decode(ByteBuff buffer, FlowContext flowContext) {
+        int res = decoder.decode(buffer);
+        if (decoder.state() == DataDecoder.STATE_FINISHED) {
+            long now = handler.timeGenerator.currentTimeMillis();
+
+            if (LOGGER.isDebugEnabled()) {
+                debug(LOGGER, flowContext,
+                        "Response decoded (size: {} B, downloaded: {} ms)",
+                        res,
+                        now - flowContext.receivedStartTimestamp()
+                );
+            }
+
+            //TODO Dorobic metryki
+            /*if (client.isCollectMetrics()) {
+                synchronized (client.metric) {
+                    client.metric.incResponsesNum();
+                    client.metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize());
+                    client.metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp());
+                    if (lastRequest != null) {
+                        client.metric.addResponseTime(now - (long) lastRequest.getTag(TAG_TIME_START));
+                    }
+                }
+            }*/
+
+        }
+
+        return res;
+    }
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandlerDataEncoder.java	Mon Jul 22 15:46:58 2019 +0200
@@ -0,0 +1,17 @@
+package com.passus.st.client.dns;
+
+import com.passus.data.ByteBuff;
+import com.passus.net.dns.Dns;
+import com.passus.net.dns.DnsEncoder;
+import com.passus.st.client.FlowContext;
+import com.passus.st.client.FlowHandlerDataEncoder;
+
+public class DnsFlowHandlerDataEncoder implements FlowHandlerDataEncoder<Dns> {
+
+    private DnsEncoder encoder = new DnsEncoder();
+
+    @Override
+    public void encode(Dns dns, FlowContext flowContext, ByteBuff out) {
+        encoder.encode(dns, out);
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java	Mon Jul 22 15:46:58 2019 +0200
@@ -1,5 +1,6 @@
 package com.passus.st.client.http;
 
+import com.passus.commons.Assert;
 import com.passus.commons.time.TimeAware;
 import com.passus.commons.time.TimeGenerator;
 import com.passus.net.http.HttpRequest;
@@ -61,6 +62,7 @@
 
     @Override
     public void setTimeGenerator(TimeGenerator timeGenerator) {
+        Assert.notNull(timeGenerator, "timeGenerator");
         this.timeGenerator = timeGenerator;
     }
 
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandlerDataDecoder.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandlerDataDecoder.java	Mon Jul 22 15:46:58 2019 +0200
@@ -20,12 +20,12 @@
 
     private final HttpFullMessageDecoder decoder;
 
-    private final HttpFlowHandler client;
+    private final HttpFlowHandler handler;
 
     private HttpRequest lastRequest;
 
-    public HttpFlowHandlerDataDecoder(HttpFlowHandler client) {
-        this.client = client;
+    public HttpFlowHandlerDataDecoder(HttpFlowHandler handler) {
+        this.handler = handler;
         decoder = new HttpFullMessageDecoder();
         decoder.setDecodeRequest(false);
     }
@@ -67,7 +67,7 @@
 
         int res = decoder.decode(buffer);
         if (decoder.state() == DataDecoder.STATE_FINISHED) {
-            long now = client.timeGenerator.currentTimeMillis();
+            long now = handler.timeGenerator.currentTimeMillis();
             HttpResponse resp = (HttpResponse) decoder.getResult();
 
             if (LOGGER.isDebugEnabled()) {
@@ -79,14 +79,14 @@
                 );
             }
 
-            if (client.isCollectMetrics()) {
-                synchronized (client.metric) {
-                    client.metric.incResponsesNum();
-                    client.metric.addResponseStatusCode(resp.getStatus().getCode());
-                    client.metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize());
-                    client.metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp());
+            if (handler.isCollectMetrics()) {
+                synchronized (handler.metric) {
+                    handler.metric.incResponsesNum();
+                    handler.metric.addResponseStatusCode(resp.getStatus().getCode());
+                    handler.metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize());
+                    handler.metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp());
                     if (lastRequest != null) {
-                        client.metric.addResponseTime(now - (long) lastRequest.getTag(TAG_TIME_START));
+                        handler.metric.addResponseTime(now - (long) lastRequest.getTag(TAG_TIME_START));
                     }
                 }
             }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java	Mon Jul 22 15:46:58 2019 +0200
@@ -58,7 +58,7 @@
     private final Class<? extends NioEmitterWorker> workerClass;
 
     public NioEmitter() {
-        this(NioSocketEmitterWorker.class);
+        this(NioEmitterWorkerImpl.class);
     }
 
     public NioEmitter(Class<? extends NioEmitterWorker> workerClass) {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java	Mon Jul 22 15:46:58 2019 +0200
@@ -0,0 +1,130 @@
+package com.passus.st.emitter.nio;
+
+import com.passus.net.SocketAddress;
+import com.passus.net.session.Session;
+import com.passus.net.utils.AddressUtils;
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.emitter.SessionMapper;
+
+import java.io.IOException;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import static com.passus.st.emitter.SessionMapper.ANY_SOCKET;
+
+public class NioEmitterWorkerImpl extends NioAbstractEmitterWorker {
+
+    public NioEmitterWorkerImpl(int index) throws IOException {
+        super(index);
+    }
+
+    @Override
+    protected void doConnect(SessionInfo sessionInfo, EmitterHandler handler) {
+        if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) {
+            doConnectSocket(sessionInfo, handler);
+        } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) {
+            doConnectDatagram(sessionInfo, handler);
+        }
+    }
+
+    protected void doConnectSocket(SessionInfo sessionInfo, EmitterHandler handler) {
+        try {
+            SessionMapper.ConnectionParams connParams = getConnParams(sessionInfo, handler);
+            if (connParams == null) {
+                return;
+            }
+
+            SocketChannel channel = SocketChannel.open();
+            channel.configureBlocking(false);
+
+            SocketAddress bindAddress = connParams.getBindAddress();
+            if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
+                channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress));
+            }
+
+            SocketAddress remoteAddress = connParams.getRemoteAddress();
+            if (remoteAddress == null) {
+                remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
+            }
+
+            NioSocketChannelContext channelContext = new NioSocketChannelContext(this, channel, remoteAddress, sessionInfo);
+            KeyContext keyContext = new KeyContext(channelContext, handler);
+            SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext);
+            try {
+                handler.channelRegistered(channelContext);
+            } catch (Exception ex) {
+                doCatchException(key, ex);
+            }
+
+            channelContext.selectionKey(key);
+            try {
+                channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress));
+            } catch (Exception ex) {
+                doCatchException(key, ex);
+                return;
+            }
+
+            selector.wakeup();
+        } catch (Exception e) {
+            if (collectMetrics) {
+                metric.errorCaught(e);
+            }
+            logger.error(e.getMessage(), e);
+        }
+
+    }
+
+    protected void doConnectDatagram(SessionInfo sessionInfo, EmitterHandler handler) {
+        try {
+            SessionMapper.ConnectionParams connParams = getConnParams(sessionInfo, handler);
+            if (connParams == null) {
+                return;
+            }
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
+            }
+
+            DatagramChannel channel = DatagramChannel.open();
+            channel.configureBlocking(false);
+
+            SocketAddress bindAddress = connParams.getBindAddress();
+            if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
+                channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress));
+            }
+
+            SocketAddress remoteAddress = connParams.getRemoteAddress();
+            if (remoteAddress == null) {
+                remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
+            }
+
+            NioDatagramChannelContext channelContext = new NioDatagramChannelContext(this, channel, remoteAddress, sessionInfo);
+            KeyContext keyContext = new KeyContext(channelContext, handler);
+            SelectionKey key = channel.register(selector, 0, keyContext);
+            try {
+                handler.channelRegistered(channelContext);
+            } catch (Exception ex) {
+                doCatchException(key, ex);
+            }
+
+            channelContext.selectionKey(key);
+            try {
+                channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress));
+                keyContext.handler.channelActive(keyContext.channelContext);
+            } catch (Exception ex) {
+                doCatchException(key, ex);
+                return;
+            }
+
+            selector.wakeup();
+        } catch (Exception e) {
+            if (collectMetrics) {
+                metric.errorCaught(e);
+            }
+            logger.error(e.getMessage(), e);
+        }
+
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/source/BaseSessionAnalyzerListener.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/BaseSessionAnalyzerListener.java	Mon Jul 22 15:46:58 2019 +0200
@@ -22,22 +22,26 @@
 
     protected final PcapSessionEventSourceMetric metric;
 
+    protected final int protocolId;
+
     public BaseSessionAnalyzerListener(String sourceName, EventHandler eventHandler,
-                                       boolean collectMetric, PcapSessionEventSourceMetric metric) {
+                                       boolean collectMetric, PcapSessionEventSourceMetric metric,
+                                       int protocolId) {
         this.sourceName = sourceName;
         this.eventHandler = eventHandler;
         this.collectMetric = collectMetric;
         this.metric = metric;
+        this.protocolId = protocolId;
     }
 
     protected void firePayloadEvent(Object req, Object resp, SessionContext context, long timestamp) {
         SessionInfo info = new SessionInfo(
                 context.getSrcIpAddr(), context.getSrcPort(),
                 context.getDstIpAddr(), context.getDstPort(),
-                HTTP, context.getProtocol(), context.getId());
+                context.getProtocol(), protocolId, context.getId());
         info.setSourceName(sourceName);
 
-        Event event = new SessionPayloadEvent(info, req, resp, HTTP, sourceName);
+        Event event = new SessionPayloadEvent(info, req, resp, protocolId, sourceName);
         event.setTimestamp(timestamp);
         eventHandler.handle(event);
         if (collectMetric) {
--- a/stress-tester/src/main/java/com/passus/st/source/PcapDnsListener.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapDnsListener.java	Mon Jul 22 15:46:58 2019 +0200
@@ -8,13 +8,15 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import static com.passus.st.Protocols.DNS;
+
 public class PcapDnsListener extends BaseSessionAnalyzerListener<Dns> {
 
     private final Map<SessionKey, Dns> lastRequests;
 
     public PcapDnsListener(String sourceName, EventHandler eventHandler,
                            boolean collectMetric, PcapSessionEventSourceMetric metric, int maxSessionNum) {
-        super(sourceName, eventHandler, collectMetric, metric);
+        super(sourceName, eventHandler, collectMetric, metric, DNS);
         lastRequests = new HashMap<>(maxSessionNum);
     }
 
--- a/stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java	Mon Jul 22 15:46:58 2019 +0200
@@ -9,13 +9,15 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import static com.passus.st.Protocols.HTTP;
+
 public class PcapHttpListener extends BaseSessionAnalyzerListener<HttpMessage> {
 
     private final Map<SessionKey, HttpRequest> lastRequests;
 
     public PcapHttpListener(String sourceName, EventHandler eventHandler,
                             boolean collectMetric, PcapSessionEventSourceMetric metric, int maxSessionNum) {
-        super(sourceName, eventHandler, collectMetric, metric);
+        super(sourceName, eventHandler, collectMetric, metric, HTTP);
         lastRequests = new HashMap<>(maxSessionNum);
     }
 
--- a/stress-tester/src/main/java/com/passus/st/source/PcapNetflowSessionAnalyzerHook.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapNetflowSessionAnalyzerHook.java	Mon Jul 22 15:46:58 2019 +0200
@@ -3,6 +3,8 @@
 import com.passus.net.netflow.NetflowUdpSessionAnalyzer;
 import com.passus.net.session.SessionAnalyzer;
 
+import static com.passus.st.Protocols.DNS;
+
 public class PcapNetflowSessionAnalyzerHook extends PcapSessionAnalyzerHook {
 
     @Override
@@ -15,7 +17,7 @@
         PcapUnidirectionalAnalyzerListener listener = new PcapUnidirectionalAnalyzerListener(context.getSourceName(),
                 context.getEventHandler(),
                 context.isCollectMetric(),
-                context.getMetric());
+                context.getMetric(), DNS);
         analyzer.setListener(listener);
 
         context.getUdpProcessor().addAnalyzer(analyzer);
--- a/stress-tester/src/main/java/com/passus/st/source/PcapUnidirectionalAnalyzerListener.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapUnidirectionalAnalyzerListener.java	Mon Jul 22 15:46:58 2019 +0200
@@ -6,8 +6,8 @@
 public class PcapUnidirectionalAnalyzerListener<T> extends BaseSessionAnalyzerListener<T> {
 
     public PcapUnidirectionalAnalyzerListener(String sourceName, EventHandler eventHandler,
-                                              boolean collectMetric, PcapSessionEventSourceMetric metric) {
-        super(sourceName, eventHandler, collectMetric, metric);
+                                              boolean collectMetric, PcapSessionEventSourceMetric metric, int protocolId) {
+        super(sourceName, eventHandler, collectMetric, metric, protocolId);
     }
 
     @Override
--- a/stress-tester/src/test/java/com/passus/st/client/AsynchFlowWorkerTest.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/AsynchFlowWorkerTest.java	Mon Jul 22 15:46:58 2019 +0200
@@ -8,7 +8,9 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import static com.github.tomakehurst.wiremock.client.WireMock.*;
@@ -38,9 +40,9 @@
 
     @Test(enabled = false)
     public void testHandle() throws Exception {
-        Properties props = new Properties();
-        props.put("allowPartialSession", "true");
-        props.put("ports", "4214");
+        Map<String, Object> props = new HashMap<>();
+        props.put("allowPartialSession", true);
+        props.put("ports", 4214);
         List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props);
         assertEquals(4, events.size());
 
--- a/stress-tester/src/test/java/com/passus/st/client/FlowExecutorTest.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/FlowExecutorTest.java	Mon Jul 22 15:46:58 2019 +0200
@@ -8,9 +8,7 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
 
 import static com.github.tomakehurst.wiremock.client.WireMock.*;
 import static org.testng.AssertJUnit.assertEquals;
@@ -40,9 +38,9 @@
 
     @Test
     public void testHandle_HTTP() throws Exception {
-        Properties props = new Properties();
-        props.put("allowPartialSession", "true");
-        props.put("ports", "4214");
+        Map<String, Object> props = new HashMap<>();
+        props.put("allowPartialSession", true);
+        props.put("ports", 4214);
         List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props);
         assertEquals(4, events.size());
 
@@ -75,9 +73,9 @@
 
     @Test
     public void testHandle_HTTP_ConnectPartialSession() throws Exception {
-        Properties props = new Properties();
-        props.put("allowPartialSession", "true");
-        props.put("ports", "4214");
+        Map<String, Object> props = new HashMap<>();
+        props.put("allowPartialSession", true);
+        props.put("ports", 4214);
         List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props);
 
         NioEmitter emitter = prepareEmitter("10.87.110.40:4214 ->  " + HOST + ":" + port());
@@ -105,9 +103,9 @@
 
     @Test
     public void testHandle_HTTP_ThreeLoops() throws Exception {
-        Properties props = new Properties();
-        props.put("allowPartialSession", "true");
-        props.put("ports", "4214");
+        Map<String, Object> props = new HashMap<>();
+        props.put("allowPartialSession", true);
+        props.put("ports", 4214);
         LinkedList<Event> events = new LinkedList<>(EventUtils.readEvents("pcap/http/http_req_resp.pcap", props));
         assertEquals(4, events.size());
         Event dataEnd = events.removeLast(); //Usuwamy DataEnd
--- a/stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java	Mon Jul 22 15:46:58 2019 +0200
@@ -8,7 +8,9 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import static com.github.tomakehurst.wiremock.client.WireMock.*;
@@ -38,9 +40,9 @@
 
     @Test
     public void testHandle() throws Exception {
-        Properties props = new Properties();
-        props.put("allowPartialSession", "true");
-        props.put("ports", "4214");
+        Map<String, Object> props = new HashMap<>();
+        props.put("allowPartialSession", true);
+        props.put("ports", 4214);
         List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props);
         assertEquals(4, events.size());
 
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Mon Jul 22 15:46:58 2019 +0200
@@ -16,10 +16,7 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import java.util.Queue;
+import java.util.*;
 
 import static com.passus.st.utils.Assert.assertHttpClientEvents;
 import static org.testng.AssertJUnit.assertEquals;
@@ -199,9 +196,9 @@
     }
 
     private List<Event> readEvents(String pcapFile) throws Exception {
-        Properties props = new Properties();
-        props.put("allowPartialSession", "true");
-        props.put("ports", "4214");
+        Map<String, Object> props = new HashMap<>();
+        props.put("allowPartialSession", true);
+        props.put("ports", 4214);
         return EventUtils.readEvents(pcapFile, props);
     }
 
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java	Mon Jul 22 15:46:58 2019 +0200
@@ -13,7 +13,9 @@
 import com.passus.st.utils.EventUtils;
 import com.passus.st.utils.NcDataBlockReaderUtils;
 import java.io.File;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import static org.testng.AssertJUnit.assertEquals;
 import org.testng.annotations.Test;
@@ -26,9 +28,9 @@
 
     @Test
     public void testHandle() throws Exception {
-        Properties props = new Properties();
-        props.put("allowPartialSession", "true");
-        props.put("ports", "4214");
+        Map<String, Object> props = new HashMap<>();
+        props.put("allowPartialSession", true);
+        props.put("ports", 4214);
         List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props);
         assertEquals(4, events.size());
 
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Mon Jul 22 15:46:58 2019 +0200
@@ -7,7 +7,9 @@
 import com.passus.st.utils.EventUtils;
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import static com.passus.st.utils.Assert.*;
 import org.testng.annotations.DataProvider;
@@ -20,9 +22,9 @@
 public class NcEventSourceTest {
 
     private FileEvents writeEvents(String pcapFile) throws IOException {
-        Properties props = new Properties();
-        props.put("allowPartialSession", "true");
-        props.put("ports", "4214,8080");
+        Map<String, Object> props = new HashMap<>();
+        props.put("allowPartialSession", true);
+        props.put("ports", 4214);
         List<Event> events = EventUtils.readEvents(pcapFile, props);
 
         File tmpFile = createTmpFile(false);
--- a/stress-tester/src/test/java/com/passus/st/utils/EventUtils.java	Fri Jul 19 14:56:06 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/utils/EventUtils.java	Mon Jul 22 15:46:58 2019 +0200
@@ -1,15 +1,19 @@
 package com.passus.st.utils;
 
+import com.passus.commons.utils.ArrayUtils;
 import com.passus.commons.utils.ResourceUtils;
 import com.passus.net.PortRangeSet;
+import com.passus.net.dns.session.DnsUdpSessionAnalyzer;
 import com.passus.net.http.session.HttpSessionAnalyzer;
+import com.passus.st.Protocols;
 import com.passus.st.client.ArrayListEventHandler;
 import com.passus.st.client.Event;
 import com.passus.st.source.PcapSessionEventSource;
 
 import java.io.File;
 import java.util.List;
-import java.util.Properties;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * @author Mirosław Hawrot
@@ -23,16 +27,23 @@
         return readEvents(pcapRelativeFile, null);
     }
 
-    public static List<Event> readEvents(String pcapRelativeFile, Properties props) {
+    public static List<Event> readEvents(String pcapRelativeFile, Map<String, Object> props) {
         try {
             boolean allowPartialSession = false;
+            Set<Integer> protocols = ArrayUtils.asSet(Protocols.HTTP);
             PortRangeSet portsRange = null;
             if (props != null) {
-                allowPartialSession = Boolean.parseBoolean(props.getProperty("allowPartialSession", "false"));
+                allowPartialSession = (boolean) props.getOrDefault("allowPartialSession", false);
 
-                if (props.containsKey("ports")) {
-                    portsRange = PortRangeSet.parse(props.getProperty("ports"));
+                Object ports = props.getOrDefault("ports", null);
+                if (ports instanceof PortRangeSet) {
+                    portsRange = (PortRangeSet) ports;
+                } else if (ports instanceof Integer) {
+                    portsRange = new PortRangeSet();
+                    portsRange.add((Integer) ports);
                 }
+
+                protocols = (Set<Integer>) props.getOrDefault("protocols", ArrayUtils.asSet(Protocols.HTTP));
             }
 
             File pcapFile = ResourceUtils.getFile(pcapRelativeFile);
@@ -40,11 +51,21 @@
             src.setAllowPartialSession(allowPartialSession);
             src.setPcapFile(pcapFile.getAbsolutePath());
 
-            HttpSessionAnalyzer analyzer = new HttpSessionAnalyzer();
-            if (portsRange != null) {
-                analyzer.getPortsRange().addAll(portsRange);
+            if (protocols.contains(Protocols.HTTP)) {
+                HttpSessionAnalyzer analyzer = new HttpSessionAnalyzer();
+                if (portsRange != null) {
+                    analyzer.getPortsRange().addAll(portsRange);
+                }
+                src.addAnalyzer(analyzer);
             }
-            src.addAnalyzer(analyzer);
+
+            if (protocols.contains(Protocols.DNS)) {
+                DnsUdpSessionAnalyzer analyzer = new DnsUdpSessionAnalyzer();
+                if (portsRange != null) {
+                    analyzer.getPortsRange().addAll(portsRange);
+                }
+                src.addAnalyzer(analyzer);
+            }
 
             src.setLoops(1);