Mercurial > stress-tester
changeset 969:7b4dfce62a6b
PcapSessionEventSource - DNS integration
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramEmitterWorker.java Mon Jul 08 10:56:11 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramEmitterWorker.java Fri Jul 19 13:22:22 2019 +0200 @@ -11,7 +11,6 @@ import java.nio.channels.SelectionKey; import static com.passus.st.emitter.SessionMapper.ANY_SOCKET; -import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID; public class NioDatagramEmitterWorker extends NioAbstractEmitterWorker {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/source/BaseSessionAnalyzerListener.java Fri Jul 19 13:22:22 2019 +0200 @@ -0,0 +1,75 @@ +package com.passus.st.source; + +import com.passus.net.session.SessionAnalyzerListener; +import com.passus.net.session.SessionContext; +import com.passus.st.client.Event; +import com.passus.st.client.EventHandler; +import com.passus.st.client.SessionPayloadEvent; +import com.passus.st.client.SessionStatusEvent; +import com.passus.st.emitter.SessionInfo; + +import static com.passus.net.session.TcpSessionProcessorConstants.STATUS_CLOSE; +import static com.passus.net.session.TcpSessionProcessorConstants.STATUS_ESTABLISHED; +import static com.passus.st.Protocols.HTTP; + +public abstract class BaseSessionAnalyzerListener<T> implements SessionAnalyzerListener<T> { + + protected final String sourceName; + + protected final EventHandler eventHandler; + + protected final boolean collectMetric; + + protected final PcapSessionEventSourceMetric metric; + + public BaseSessionAnalyzerListener(String sourceName, EventHandler eventHandler, + boolean collectMetric, PcapSessionEventSourceMetric metric) { + this.sourceName = sourceName; + this.eventHandler = eventHandler; + this.collectMetric = collectMetric; + this.metric = metric; + } + + protected void firePayloadEvent(Object req, Object resp, SessionContext context, long timestamp) { + SessionInfo info = new SessionInfo( + context.getSrcIpAddr(), context.getSrcPort(), + context.getDstIpAddr(), context.getDstPort(), + HTTP, context.getProtocol(), context.getId()); + info.setSourceName(sourceName); + + Event event = new SessionPayloadEvent(info, req, resp, HTTP, sourceName); + event.setTimestamp(timestamp); + eventHandler.handle(event); + if (collectMetric) { + metric.incPayloads(); + } + } + + protected void onSessionClosed(SessionContext context, int status, long timestamp) { + + } + + @Override + public void onSessionStatusChanged(SessionContext context, int status, long timestamp) { + int sessionInfoStatus; + if (status < STATUS_ESTABLISHED) { + sessionInfoStatus = SessionStatusEvent.STATUS_OPENING; + } else if (status == STATUS_ESTABLISHED) { + sessionInfoStatus = SessionStatusEvent.STATUS_ESTABLISHED; + } else if (status != STATUS_CLOSE) { + sessionInfoStatus = SessionStatusEvent.STATUS_CLOSING; + } else { + sessionInfoStatus = SessionStatusEvent.STATUS_CLOSED; + onSessionClosed(context, status, timestamp); + } + + SessionInfo info = new SessionInfo( + context.getSrcIpAddr(), context.getSrcPort(), + context.getDstIpAddr(), context.getDstPort(), + HTTP, context.getProtocol(), context.getId()); + info.setSourceName(sourceName); + Event event = new SessionStatusEvent(info, sessionInfoStatus, sourceName); + event.setTimestamp(timestamp); + eventHandler.handle(event); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapDnsListener.java Fri Jul 19 13:22:22 2019 +0200 @@ -0,0 +1,39 @@ +package com.passus.st.source; + +import com.passus.net.dns.Dns; +import com.passus.net.session.SessionContext; +import com.passus.net.session.SessionKey; +import com.passus.st.client.EventHandler; + +import java.util.HashMap; +import java.util.Map; + +public class PcapDnsListener extends BaseSessionAnalyzerListener<Dns> { + + private final Map<SessionKey, Dns> lastRequests; + + public PcapDnsListener(String sourceName, EventHandler eventHandler, + boolean collectMetric, PcapSessionEventSourceMetric metric, int maxSessionNum) { + super(sourceName, eventHandler, collectMetric, metric); + lastRequests = new HashMap<>(maxSessionNum); + } + + @Override + public void onMessageReceived(SessionContext context, Dns dns, long timestamp) { + if (dns.isRequest()) { + lastRequests.put(context.getKey(), dns); + } else { + Dns req = lastRequests.remove(context.getKey()); + firePayloadEvent(req, dns, context, timestamp); + } + } + + @Override + protected void onSessionClosed(SessionContext context, int status, long timestamp) { + Dns req = lastRequests.get(context.getKey()); + if (req != null) { + firePayloadEvent(req, null, context, timestamp); + } + } + +}
--- a/stress-tester/src/main/java/com/passus/st/source/PcapDnsSessionAnalyzerHook.java Mon Jul 08 10:56:11 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapDnsSessionAnalyzerHook.java Fri Jul 19 13:22:22 2019 +0200 @@ -14,6 +14,13 @@ @Override protected void doAttach(SessionAnalyzer analyzer, PcapSessionAnalyzerHookContext context) { + PcapDnsListener listener = new PcapDnsListener(context.getSourceName(), + context.getEventHandler(), + context.isCollectMetric(), + context.getMetric(), + context.getTcpProcessor().getMaxSessionNumber()); + analyzer.setListener(listener); + if (analyzer instanceof DnsUdpSessionAnalyzer) { context.getUdpProcessor().addAnalyzer(analyzer); } else { @@ -23,6 +30,7 @@ @Override protected void doDetach(SessionAnalyzer analyzer, PcapSessionAnalyzerHookContext context) { + analyzer.setListener(null); if (analyzer instanceof DnsUdpSessionAnalyzer) { context.getUdpProcessor().removeAnalyzer(analyzer); } else {
--- a/stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java Mon Jul 08 10:56:11 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapHttpListener.java Fri Jul 19 13:22:22 2019 +0200 @@ -2,57 +2,21 @@ import com.passus.net.http.HttpMessage; import com.passus.net.http.HttpRequest; -import com.passus.net.http.HttpResponse; -import com.passus.net.http.session.HttpSessionListener; import com.passus.net.session.SessionContext; import com.passus.net.session.SessionKey; -import com.passus.st.client.Event; import com.passus.st.client.EventHandler; -import com.passus.st.client.SessionPayloadEvent; -import com.passus.st.client.SessionStatusEvent; -import com.passus.st.emitter.SessionInfo; import java.util.HashMap; import java.util.Map; -import static com.passus.net.session.TcpSessionProcessorConstants.STATUS_CLOSE; -import static com.passus.net.session.TcpSessionProcessorConstants.STATUS_ESTABLISHED; -import static com.passus.st.Protocols.HTTP; - -public class PcapHttpListener implements HttpSessionListener { - - private final String sourceName; +public class PcapHttpListener extends BaseSessionAnalyzerListener<HttpMessage> { private final Map<SessionKey, HttpRequest> lastRequests; - private final EventHandler eventHandler; - - private final boolean collectMetric; - - private final PcapSessionEventSourceMetric metric; - - public PcapHttpListener(String sourceName, int maxSessionNum, EventHandler eventHandler, - boolean collectMetric, PcapSessionEventSourceMetric metric) { - this.sourceName = sourceName; + public PcapHttpListener(String sourceName, EventHandler eventHandler, + boolean collectMetric, PcapSessionEventSourceMetric metric, int maxSessionNum) { + super(sourceName, eventHandler, collectMetric, metric); lastRequests = new HashMap<>(maxSessionNum); - this.eventHandler = eventHandler; - this.collectMetric = collectMetric; - this.metric = metric; - } - - private void fireRequest(HttpRequest req, HttpResponse resp, SessionContext context, long timestamp) { - SessionInfo info = new SessionInfo( - context.getSrcIpAddr(), context.getSrcPort(), - context.getDstIpAddr(), context.getDstPort(), - HTTP, context.getProtocol(), context.getId()); - info.setSourceName(sourceName); - - Event event = new SessionPayloadEvent(info, req, resp, HTTP, sourceName); - event.setTimestamp(timestamp); - eventHandler.handle(event); - if (collectMetric) { - metric.incPayloads(); - } } @Override @@ -61,36 +25,16 @@ lastRequests.put(context.getKey(), (HttpRequest) message); } else { HttpRequest req = lastRequests.remove(context.getKey()); - fireRequest(req, (HttpResponse) message, context, timestamp); + firePayloadEvent(req, message, context, timestamp); } } @Override - public void onSessionStatusChanged(SessionContext context, int status, long timestamp) { - int sessionInfoStatus; - if (status < STATUS_ESTABLISHED) { - sessionInfoStatus = SessionStatusEvent.STATUS_OPENING; - } else if (status == STATUS_ESTABLISHED) { - sessionInfoStatus = SessionStatusEvent.STATUS_ESTABLISHED; - } else if (status != STATUS_CLOSE) { - sessionInfoStatus = SessionStatusEvent.STATUS_CLOSING; - } else { - sessionInfoStatus = SessionStatusEvent.STATUS_CLOSED; - HttpRequest req = lastRequests.get(context.getKey()); - if (req != null) { - fireRequest(req, null, context, timestamp); - } + protected void onSessionClosed(SessionContext context, int status, long timestamp) { + HttpRequest req = lastRequests.get(context.getKey()); + if (req != null) { + firePayloadEvent(req, null, context, timestamp); } - - SessionInfo info = new SessionInfo( - context.getSrcIpAddr(), context.getSrcPort(), - context.getDstIpAddr(), context.getDstPort(), - HTTP, context.getProtocol(), context.getId()); - info.setSourceName(sourceName); - - Event event = new SessionStatusEvent(info, sessionInfoStatus, sourceName); - event.setTimestamp(timestamp); - eventHandler.handle(event); } } \ No newline at end of file
--- a/stress-tester/src/main/java/com/passus/st/source/PcapHttpSessionAnalyzerHook.java Mon Jul 08 10:56:11 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapHttpSessionAnalyzerHook.java Fri Jul 19 13:22:22 2019 +0200 @@ -15,10 +15,10 @@ protected void doAttach(SessionAnalyzer analyzer, PcapSessionAnalyzerHookContext context) { TcpSessionProcessor tcpProc = context.getTcpProcessor(); PcapHttpListener listener = new PcapHttpListener(context.getSourceName(), - context.getTcpProcessor().getMaxSessionNumber(), context.getEventHandler(), context.isCollectMetric(), - context.getMetric()); + context.getMetric(), + context.getTcpProcessor().getMaxSessionNumber()); analyzer.setListener(listener); tcpProc.addAnalyzer(analyzer);
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java Mon Jul 08 10:56:11 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java Fri Jul 19 13:22:22 2019 +0200 @@ -15,7 +15,6 @@ import com.passus.config.validation.LongValidator; import com.passus.net.Frame; import com.passus.net.MemoryFrame; -import com.passus.net.PortRangeSet; import com.passus.net.dns.session.DnsUdpSessionAnalyzer; import com.passus.net.http.session.HttpSessionAnalyzer; import com.passus.net.packet.*; @@ -79,8 +78,6 @@ private boolean allowPartialSession = false; - private final PortRangeSet portsRange = new PortRangeSet(); - protected boolean collectMetric; private PcapSessionEventSourceMetric metric; @@ -90,8 +87,6 @@ private List<SessionAnalyzer> analyzers = new ArrayList<>(); public PcapSessionEventSource() { - portsRange.add(80); - portsRange.add(8080); this.name = UniqueIdGenerator.generate(); hooks.add(new PcapHttpSessionAnalyzerHook()); @@ -263,10 +258,8 @@ } try { - - pcapThread = new PcapThread(pcapFile, loops); - sessionAnalyzerThread = new SessionAnalyzerThread(portsRange, allowPartialSession); + sessionAnalyzerThread = new SessionAnalyzerThread(allowPartialSession); pcapThread.start(); sessionAnalyzerThread.start(); @@ -499,7 +492,7 @@ private final PcapSessionAnalyzerHookContext hookContext; - private SessionAnalyzerThread(PortRangeSet portsRange, boolean allowPartialSession) { + private SessionAnalyzerThread(boolean allowPartialSession) { super(PcapSessionEventSource.class.getSimpleName() + ".SessionAnalyzerThread"); tcpProc = new TcpSessionProcessor(); @@ -546,7 +539,11 @@ long time = ipEvent.ip.getTimestamp(); timeGenerator.setTimeMillis(time); - tcpProc.handle(ipEvent.ip); + if (ipEvent.ip.getProtocol() == Ip.PROTO_TCP) { + tcpProc.handle(ipEvent.ip); + } else if (ipEvent.ip.getProtocol() == Ip.PROTO_UDP) { + udpProc.handle(ipEvent.ip); + } if (collectMetric) { if (ipEvent.ip.getProtocol() == Ip.PROTO_TCP) {
--- a/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java Mon Jul 08 10:56:11 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java Fri Jul 19 13:22:22 2019 +0200 @@ -1,43 +1,27 @@ package com.passus.st.source; -import com.passus.st.client.SessionPayloadEvent; import com.passus.commons.utils.ResourceUtils; +import com.passus.net.dns.Dns; +import com.passus.net.dns.session.DnsUdpSessionAnalyzer; +import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpResponse; import com.passus.st.client.ArrayListEventHandler; import com.passus.st.client.DataEvents.DataEnd; import com.passus.st.client.DataEvents.DataLoopEnd; import com.passus.st.client.Event; +import com.passus.st.client.SessionPayloadEvent; import com.passus.st.client.SessionStatusEvent; -import java.io.File; -import static org.testng.Assert.*; import org.testng.annotations.Test; +import java.io.File; + +import static org.testng.Assert.assertTrue; + /** - * * @author Mirosław Hawrot */ public class PcapSessionEventSourceTest { - @Test(enabled = true) - public void testProcess() throws Exception { - File pcapFile = ResourceUtils.getFile("pcap/http/wget.pcap"); - PcapSessionEventSource src = new PcapSessionEventSource(); - src.setName("pcapSource"); - src.setLoops(1); - src.setPcapFile(pcapFile.getAbsolutePath()); - - ArrayListEventHandler handler = new ArrayListEventHandler(); - src.setHandler(handler); - - src.start(); - waitForSource(src); - src.stop(); - - assertTrue(handler.get(handler.size() - 3) instanceof SessionStatusEvent); - assertTrue(handler.findFirst(SessionPayloadEvent.TYPE) instanceof SessionPayloadEvent); - assertTrue(handler.get(handler.size() - 2) instanceof DataLoopEnd); - assertTrue(handler.get(handler.size() - 1) instanceof DataEnd); - } - private void doAssetsOrder(String fileName) { File pcapFile = ResourceUtils.getFile(fileName); PcapSessionEventSource src = new PcapSessionEventSource(); @@ -67,6 +51,55 @@ doAssetsOrder("pcap/http/ndiag-login-fresh.pcap"); } + @Test(enabled = true) + public void testProcessHttp() throws Exception { + File pcapFile = ResourceUtils.getFile("pcap/http/wget.pcap"); + PcapSessionEventSource src = new PcapSessionEventSource(); + src.setName("pcapSource"); + src.setLoops(1); + src.setPcapFile(pcapFile.getAbsolutePath()); + + ArrayListEventHandler handler = new ArrayListEventHandler(); + src.setHandler(handler); + + src.start(); + waitForSource(src); + src.stop(); + + SessionPayloadEvent payloadEvent = (SessionPayloadEvent) handler.findFirst(SessionPayloadEvent.TYPE); + + assertTrue(payloadEvent.getRequest() instanceof HttpRequest); + assertTrue(payloadEvent.getResponse() instanceof HttpResponse); + + assertTrue(handler.get(handler.size() - 3) instanceof SessionStatusEvent); + assertTrue(handler.get(handler.size() - 2) instanceof DataLoopEnd); + assertTrue(handler.get(handler.size() - 1) instanceof DataEnd); + } + + @Test(enabled = true) + public void testProcessDnsUdp() throws Exception { + File pcapFile = ResourceUtils.getFile("pcap/dns/dns_A_req_resp.pcap"); + PcapSessionEventSource src = new PcapSessionEventSource(); + src.addAnalyzer(new DnsUdpSessionAnalyzer()); + src.setName("pcapSource"); + src.setLoops(1); + src.setPcapFile(pcapFile.getAbsolutePath()); + + ArrayListEventHandler handler = new ArrayListEventHandler(); + src.setHandler(handler); + + src.start(); + waitForSource(src); + src.stop(); + + SessionPayloadEvent payloadEvent = (SessionPayloadEvent) handler.findFirst(SessionPayloadEvent.TYPE); + + assertTrue(payloadEvent.getRequest() instanceof Dns); + assertTrue(payloadEvent.getResponse() instanceof Dns); + + assertTrue(handler.get(handler.size() - 1) instanceof DataEnd); + } + public static void waitForSource(PcapSessionEventSource src) { try { Thread.sleep(200);