changeset 969:7b4dfce62a6b

PcapSessionEventSource - DNS integration
author Devel 2
date Fri, 19 Jul 2019 13:22:22 +0200
parents 0de7ca4925a3
children b0e26fb79c3a
files stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramEmitterWorker.java stress-tester/src/main/java/com/passus/st/source/BaseSessionAnalyzerListener.java stress-tester/src/main/java/com/passus/st/source/PcapDnsListener.java stress-tester/src/main/java/com/passus/st/source/PcapDnsSessionAnalyzerHook.java stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java stress-tester/src/main/java/com/passus/st/source/PcapHttpSessionAnalyzerHook.java stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java stress-tester/src/test/resources/pcap/dns/dns_A_req_resp.pcap
diffstat 9 files changed, 198 insertions(+), 103 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramEmitterWorker.java	Mon Jul 08 10:56:11 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramEmitterWorker.java	Fri Jul 19 13:22:22 2019 +0200
@@ -11,7 +11,6 @@
 import java.nio.channels.SelectionKey;
 
 import static com.passus.st.emitter.SessionMapper.ANY_SOCKET;
-import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID;
 
 public class NioDatagramEmitterWorker extends NioAbstractEmitterWorker {
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/source/BaseSessionAnalyzerListener.java	Fri Jul 19 13:22:22 2019 +0200
@@ -0,0 +1,75 @@
+package com.passus.st.source;
+
+import com.passus.net.session.SessionAnalyzerListener;
+import com.passus.net.session.SessionContext;
+import com.passus.st.client.Event;
+import com.passus.st.client.EventHandler;
+import com.passus.st.client.SessionPayloadEvent;
+import com.passus.st.client.SessionStatusEvent;
+import com.passus.st.emitter.SessionInfo;
+
+import static com.passus.net.session.TcpSessionProcessorConstants.STATUS_CLOSE;
+import static com.passus.net.session.TcpSessionProcessorConstants.STATUS_ESTABLISHED;
+import static com.passus.st.Protocols.HTTP;
+
+public abstract class BaseSessionAnalyzerListener<T> implements SessionAnalyzerListener<T> {
+
+    protected final String sourceName;
+
+    protected final EventHandler eventHandler;
+
+    protected final boolean collectMetric;
+
+    protected final PcapSessionEventSourceMetric metric;
+
+    public BaseSessionAnalyzerListener(String sourceName, EventHandler eventHandler,
+                                       boolean collectMetric, PcapSessionEventSourceMetric metric) {
+        this.sourceName = sourceName;
+        this.eventHandler = eventHandler;
+        this.collectMetric = collectMetric;
+        this.metric = metric;
+    }
+
+    protected void firePayloadEvent(Object req, Object resp, SessionContext context, long timestamp) {
+        SessionInfo info = new SessionInfo(
+                context.getSrcIpAddr(), context.getSrcPort(),
+                context.getDstIpAddr(), context.getDstPort(),
+                HTTP, context.getProtocol(), context.getId());
+        info.setSourceName(sourceName);
+
+        Event event = new SessionPayloadEvent(info, req, resp, HTTP, sourceName);
+        event.setTimestamp(timestamp);
+        eventHandler.handle(event);
+        if (collectMetric) {
+            metric.incPayloads();
+        }
+    }
+
+    protected void onSessionClosed(SessionContext context, int status, long timestamp) {
+
+    }
+
+    @Override
+    public void onSessionStatusChanged(SessionContext context, int status, long timestamp) {
+        int sessionInfoStatus;
+        if (status < STATUS_ESTABLISHED) {
+            sessionInfoStatus = SessionStatusEvent.STATUS_OPENING;
+        } else if (status == STATUS_ESTABLISHED) {
+            sessionInfoStatus = SessionStatusEvent.STATUS_ESTABLISHED;
+        } else if (status != STATUS_CLOSE) {
+            sessionInfoStatus = SessionStatusEvent.STATUS_CLOSING;
+        } else {
+            sessionInfoStatus = SessionStatusEvent.STATUS_CLOSED;
+            onSessionClosed(context, status, timestamp);
+        }
+
+        SessionInfo info = new SessionInfo(
+                context.getSrcIpAddr(), context.getSrcPort(),
+                context.getDstIpAddr(), context.getDstPort(),
+                HTTP, context.getProtocol(), context.getId());
+        info.setSourceName(sourceName);
+        Event event = new SessionStatusEvent(info, sessionInfoStatus, sourceName);
+        event.setTimestamp(timestamp);
+        eventHandler.handle(event);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapDnsListener.java	Fri Jul 19 13:22:22 2019 +0200
@@ -0,0 +1,39 @@
+package com.passus.st.source;
+
+import com.passus.net.dns.Dns;
+import com.passus.net.session.SessionContext;
+import com.passus.net.session.SessionKey;
+import com.passus.st.client.EventHandler;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PcapDnsListener extends BaseSessionAnalyzerListener<Dns> {
+
+    private final Map<SessionKey, Dns> lastRequests;
+
+    public PcapDnsListener(String sourceName, EventHandler eventHandler,
+                           boolean collectMetric, PcapSessionEventSourceMetric metric, int maxSessionNum) {
+        super(sourceName, eventHandler, collectMetric, metric);
+        lastRequests = new HashMap<>(maxSessionNum);
+    }
+
+    @Override
+    public void onMessageReceived(SessionContext context, Dns dns, long timestamp) {
+        if (dns.isRequest()) {
+            lastRequests.put(context.getKey(), dns);
+        } else {
+            Dns req = lastRequests.remove(context.getKey());
+            firePayloadEvent(req, dns, context, timestamp);
+        }
+    }
+
+    @Override
+    protected void onSessionClosed(SessionContext context, int status, long timestamp) {
+        Dns req = lastRequests.get(context.getKey());
+        if (req != null) {
+            firePayloadEvent(req, null, context, timestamp);
+        }
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/source/PcapDnsSessionAnalyzerHook.java	Mon Jul 08 10:56:11 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapDnsSessionAnalyzerHook.java	Fri Jul 19 13:22:22 2019 +0200
@@ -14,6 +14,13 @@
 
     @Override
     protected void doAttach(SessionAnalyzer analyzer, PcapSessionAnalyzerHookContext context) {
+        PcapDnsListener listener = new PcapDnsListener(context.getSourceName(),
+                context.getEventHandler(),
+                context.isCollectMetric(),
+                context.getMetric(),
+                context.getTcpProcessor().getMaxSessionNumber());
+        analyzer.setListener(listener);
+
         if (analyzer instanceof DnsUdpSessionAnalyzer) {
             context.getUdpProcessor().addAnalyzer(analyzer);
         } else {
@@ -23,6 +30,7 @@
 
     @Override
     protected void doDetach(SessionAnalyzer analyzer, PcapSessionAnalyzerHookContext context) {
+        analyzer.setListener(null);
         if (analyzer instanceof DnsUdpSessionAnalyzer) {
             context.getUdpProcessor().removeAnalyzer(analyzer);
         } else {
--- a/stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java	Mon Jul 08 10:56:11 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java	Fri Jul 19 13:22:22 2019 +0200
@@ -2,57 +2,21 @@
 
 import com.passus.net.http.HttpMessage;
 import com.passus.net.http.HttpRequest;
-import com.passus.net.http.HttpResponse;
-import com.passus.net.http.session.HttpSessionListener;
 import com.passus.net.session.SessionContext;
 import com.passus.net.session.SessionKey;
-import com.passus.st.client.Event;
 import com.passus.st.client.EventHandler;
-import com.passus.st.client.SessionPayloadEvent;
-import com.passus.st.client.SessionStatusEvent;
-import com.passus.st.emitter.SessionInfo;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import static com.passus.net.session.TcpSessionProcessorConstants.STATUS_CLOSE;
-import static com.passus.net.session.TcpSessionProcessorConstants.STATUS_ESTABLISHED;
-import static com.passus.st.Protocols.HTTP;
-
-public class PcapHttpListener implements HttpSessionListener {
-
-    private final String sourceName;
+public class PcapHttpListener extends BaseSessionAnalyzerListener<HttpMessage> {
 
     private final Map<SessionKey, HttpRequest> lastRequests;
 
-    private final EventHandler eventHandler;
-
-    private final boolean collectMetric;
-
-    private final PcapSessionEventSourceMetric metric;
-
-    public PcapHttpListener(String sourceName, int maxSessionNum, EventHandler eventHandler,
-                            boolean collectMetric, PcapSessionEventSourceMetric metric) {
-        this.sourceName = sourceName;
+    public PcapHttpListener(String sourceName, EventHandler eventHandler,
+                            boolean collectMetric, PcapSessionEventSourceMetric metric, int maxSessionNum) {
+        super(sourceName, eventHandler, collectMetric, metric);
         lastRequests = new HashMap<>(maxSessionNum);
-        this.eventHandler = eventHandler;
-        this.collectMetric = collectMetric;
-        this.metric = metric;
-    }
-
-    private void fireRequest(HttpRequest req, HttpResponse resp, SessionContext context, long timestamp) {
-        SessionInfo info = new SessionInfo(
-                context.getSrcIpAddr(), context.getSrcPort(),
-                context.getDstIpAddr(), context.getDstPort(),
-                HTTP, context.getProtocol(), context.getId());
-        info.setSourceName(sourceName);
-
-        Event event = new SessionPayloadEvent(info, req, resp, HTTP, sourceName);
-        event.setTimestamp(timestamp);
-        eventHandler.handle(event);
-        if (collectMetric) {
-            metric.incPayloads();
-        }
     }
 
     @Override
@@ -61,36 +25,16 @@
             lastRequests.put(context.getKey(), (HttpRequest) message);
         } else {
             HttpRequest req = lastRequests.remove(context.getKey());
-            fireRequest(req, (HttpResponse) message, context, timestamp);
+            firePayloadEvent(req, message, context, timestamp);
         }
     }
 
     @Override
-    public void onSessionStatusChanged(SessionContext context, int status, long timestamp) {
-        int sessionInfoStatus;
-        if (status < STATUS_ESTABLISHED) {
-            sessionInfoStatus = SessionStatusEvent.STATUS_OPENING;
-        } else if (status == STATUS_ESTABLISHED) {
-            sessionInfoStatus = SessionStatusEvent.STATUS_ESTABLISHED;
-        } else if (status != STATUS_CLOSE) {
-            sessionInfoStatus = SessionStatusEvent.STATUS_CLOSING;
-        } else {
-            sessionInfoStatus = SessionStatusEvent.STATUS_CLOSED;
-            HttpRequest req = lastRequests.get(context.getKey());
-            if (req != null) {
-                fireRequest(req, null, context, timestamp);
-            }
+    protected void onSessionClosed(SessionContext context, int status, long timestamp) {
+        HttpRequest req = lastRequests.get(context.getKey());
+        if (req != null) {
+            firePayloadEvent(req, null, context, timestamp);
         }
-
-        SessionInfo info = new SessionInfo(
-                context.getSrcIpAddr(), context.getSrcPort(),
-                context.getDstIpAddr(), context.getDstPort(),
-                HTTP, context.getProtocol(), context.getId());
-        info.setSourceName(sourceName);
-
-        Event event = new SessionStatusEvent(info, sessionInfoStatus, sourceName);
-        event.setTimestamp(timestamp);
-        eventHandler.handle(event);
     }
 
 }
\ No newline at end of file
--- a/stress-tester/src/main/java/com/passus/st/source/PcapHttpSessionAnalyzerHook.java	Mon Jul 08 10:56:11 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapHttpSessionAnalyzerHook.java	Fri Jul 19 13:22:22 2019 +0200
@@ -15,10 +15,10 @@
     protected void doAttach(SessionAnalyzer analyzer, PcapSessionAnalyzerHookContext context) {
         TcpSessionProcessor tcpProc = context.getTcpProcessor();
         PcapHttpListener listener = new PcapHttpListener(context.getSourceName(),
-                context.getTcpProcessor().getMaxSessionNumber(),
                 context.getEventHandler(),
                 context.isCollectMetric(),
-                context.getMetric());
+                context.getMetric(),
+                context.getTcpProcessor().getMaxSessionNumber());
 
         analyzer.setListener(listener);
         tcpProc.addAnalyzer(analyzer);
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java	Mon Jul 08 10:56:11 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java	Fri Jul 19 13:22:22 2019 +0200
@@ -15,7 +15,6 @@
 import com.passus.config.validation.LongValidator;
 import com.passus.net.Frame;
 import com.passus.net.MemoryFrame;
-import com.passus.net.PortRangeSet;
 import com.passus.net.dns.session.DnsUdpSessionAnalyzer;
 import com.passus.net.http.session.HttpSessionAnalyzer;
 import com.passus.net.packet.*;
@@ -79,8 +78,6 @@
 
     private boolean allowPartialSession = false;
 
-    private final PortRangeSet portsRange = new PortRangeSet();
-
     protected boolean collectMetric;
 
     private PcapSessionEventSourceMetric metric;
@@ -90,8 +87,6 @@
     private List<SessionAnalyzer> analyzers = new ArrayList<>();
 
     public PcapSessionEventSource() {
-        portsRange.add(80);
-        portsRange.add(8080);
         this.name = UniqueIdGenerator.generate();
 
         hooks.add(new PcapHttpSessionAnalyzerHook());
@@ -263,10 +258,8 @@
         }
 
         try {
-
-
             pcapThread = new PcapThread(pcapFile, loops);
-            sessionAnalyzerThread = new SessionAnalyzerThread(portsRange, allowPartialSession);
+            sessionAnalyzerThread = new SessionAnalyzerThread(allowPartialSession);
 
             pcapThread.start();
             sessionAnalyzerThread.start();
@@ -499,7 +492,7 @@
 
         private final PcapSessionAnalyzerHookContext hookContext;
 
-        private SessionAnalyzerThread(PortRangeSet portsRange, boolean allowPartialSession) {
+        private SessionAnalyzerThread(boolean allowPartialSession) {
             super(PcapSessionEventSource.class.getSimpleName() + ".SessionAnalyzerThread");
 
             tcpProc = new TcpSessionProcessor();
@@ -546,7 +539,11 @@
                             long time = ipEvent.ip.getTimestamp();
                             timeGenerator.setTimeMillis(time);
 
-                            tcpProc.handle(ipEvent.ip);
+                            if (ipEvent.ip.getProtocol() == Ip.PROTO_TCP) {
+                                tcpProc.handle(ipEvent.ip);
+                            } else if (ipEvent.ip.getProtocol() == Ip.PROTO_UDP) {
+                                udpProc.handle(ipEvent.ip);
+                            }
 
                             if (collectMetric) {
                                 if (ipEvent.ip.getProtocol() == Ip.PROTO_TCP) {
--- a/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java	Mon Jul 08 10:56:11 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java	Fri Jul 19 13:22:22 2019 +0200
@@ -1,43 +1,27 @@
 package com.passus.st.source;
 
-import com.passus.st.client.SessionPayloadEvent;
 import com.passus.commons.utils.ResourceUtils;
+import com.passus.net.dns.Dns;
+import com.passus.net.dns.session.DnsUdpSessionAnalyzer;
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpResponse;
 import com.passus.st.client.ArrayListEventHandler;
 import com.passus.st.client.DataEvents.DataEnd;
 import com.passus.st.client.DataEvents.DataLoopEnd;
 import com.passus.st.client.Event;
+import com.passus.st.client.SessionPayloadEvent;
 import com.passus.st.client.SessionStatusEvent;
-import java.io.File;
-import static org.testng.Assert.*;
 import org.testng.annotations.Test;
 
+import java.io.File;
+
+import static org.testng.Assert.assertTrue;
+
 /**
- *
  * @author Mirosław Hawrot
  */
 public class PcapSessionEventSourceTest {
 
-    @Test(enabled = true)
-    public void testProcess() throws Exception {
-        File pcapFile = ResourceUtils.getFile("pcap/http/wget.pcap");
-        PcapSessionEventSource src = new PcapSessionEventSource();
-        src.setName("pcapSource");
-        src.setLoops(1);
-        src.setPcapFile(pcapFile.getAbsolutePath());
-
-        ArrayListEventHandler handler = new ArrayListEventHandler();
-        src.setHandler(handler);
-
-        src.start();
-        waitForSource(src);
-        src.stop();
-
-        assertTrue(handler.get(handler.size() - 3) instanceof SessionStatusEvent);
-        assertTrue(handler.findFirst(SessionPayloadEvent.TYPE) instanceof SessionPayloadEvent);
-        assertTrue(handler.get(handler.size() - 2) instanceof DataLoopEnd);
-        assertTrue(handler.get(handler.size() - 1) instanceof DataEnd);
-    }
-
     private void doAssetsOrder(String fileName) {
         File pcapFile = ResourceUtils.getFile(fileName);
         PcapSessionEventSource src = new PcapSessionEventSource();
@@ -67,6 +51,55 @@
         doAssetsOrder("pcap/http/ndiag-login-fresh.pcap");
     }
 
+    @Test(enabled = true)
+    public void testProcessHttp() throws Exception {
+        File pcapFile = ResourceUtils.getFile("pcap/http/wget.pcap");
+        PcapSessionEventSource src = new PcapSessionEventSource();
+        src.setName("pcapSource");
+        src.setLoops(1);
+        src.setPcapFile(pcapFile.getAbsolutePath());
+
+        ArrayListEventHandler handler = new ArrayListEventHandler();
+        src.setHandler(handler);
+
+        src.start();
+        waitForSource(src);
+        src.stop();
+
+        SessionPayloadEvent payloadEvent = (SessionPayloadEvent) handler.findFirst(SessionPayloadEvent.TYPE);
+
+        assertTrue(payloadEvent.getRequest() instanceof HttpRequest);
+        assertTrue(payloadEvent.getResponse() instanceof HttpResponse);
+
+        assertTrue(handler.get(handler.size() - 3) instanceof SessionStatusEvent);
+        assertTrue(handler.get(handler.size() - 2) instanceof DataLoopEnd);
+        assertTrue(handler.get(handler.size() - 1) instanceof DataEnd);
+    }
+
+    @Test(enabled = true)
+    public void testProcessDnsUdp() throws Exception {
+        File pcapFile = ResourceUtils.getFile("pcap/dns/dns_A_req_resp.pcap");
+        PcapSessionEventSource src = new PcapSessionEventSource();
+        src.addAnalyzer(new DnsUdpSessionAnalyzer());
+        src.setName("pcapSource");
+        src.setLoops(1);
+        src.setPcapFile(pcapFile.getAbsolutePath());
+
+        ArrayListEventHandler handler = new ArrayListEventHandler();
+        src.setHandler(handler);
+
+        src.start();
+        waitForSource(src);
+        src.stop();
+
+        SessionPayloadEvent payloadEvent = (SessionPayloadEvent) handler.findFirst(SessionPayloadEvent.TYPE);
+
+        assertTrue(payloadEvent.getRequest() instanceof Dns);
+        assertTrue(payloadEvent.getResponse() instanceof Dns);
+
+        assertTrue(handler.get(handler.size() - 1) instanceof DataEnd);
+    }
+
     public static void waitForSource(PcapSessionEventSource src) {
         try {
             Thread.sleep(200);
Binary file stress-tester/src/test/resources/pcap/dns/dns_A_req_resp.pcap has changed