changeset 1039:58860e741b9e

NcEventSource, NcEventDestination - PgSql support
author Devel 2
date Fri, 10 Apr 2020 12:47:48 +0200
parents 9dff4810b5b8
children bff9be223040
files stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpDataReader.java stress-tester/src/main/java/com/passus/st/reader/nc/NcPgSqlPayloadReader.java stress-tester/src/main/java/com/passus/st/reader/nc/NcPgSqlPayloadWriter.java stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java stress-tester/src/main/java/com/passus/st/source/NcEventSource.java stress-tester/src/main/java/com/passus/st/utils/EventUtils.java stress-tester/src/test/java/com/passus/st/client/AbstractFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java stress-tester/src/test/java/com/passus/st/utils/Assert.java stress-tester/src/test/java/com/passus/st/utils/EventUtils.java
diffstat 10 files changed, 109 insertions(+), 161 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpDataReader.java	Fri Apr 10 10:22:01 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,17 +0,0 @@
-package com.passus.st.reader.nc;
-
-import com.passus.net.http.HttpMessage;
-import com.passus.st.reader.DataBlockReader;
-import java.io.IOException;
-
-/**
- *
- * @author Mirosław Hawrot
- */
-public class NcHttpDataReader {
-
-    public HttpMessage read(DataBlockReader<NcDataBlock> reader) throws IOException {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcPgSqlPayloadReader.java	Fri Apr 10 12:47:48 2020 +0200
@@ -0,0 +1,39 @@
+package com.passus.st.reader.nc;
+
+import com.passus.data.ByteBuff;
+import com.passus.data.DataDecoder;
+import com.passus.net.pgsql.PgSqlDecoder;
+import com.passus.net.pgsql.PgSqlMessage;
+import com.passus.st.source.NcPayloadReader;
+
+import java.io.IOException;
+
+public class NcPgSqlPayloadReader extends NcPayloadReader<PgSqlMessage, PgSqlMessage> {
+
+    private final PgSqlDecoder decoderClient = new PgSqlDecoder(true);
+
+    private final PgSqlDecoder decoderServer = new PgSqlDecoder();
+
+    private PgSqlMessage decode(ByteBuff payload, PgSqlDecoder decoder) throws IOException {
+        try {
+            int res = decoder.decode(payload);
+            payload.skipBytes(res);
+            if (decoder.state() == DataDecoder.STATE_FINISHED) {
+                return decoder.getResult();
+            }
+            throw new IOException("Invalid PgSql data.");
+        } finally {
+            decoder.clear();
+        }
+    }
+
+    @Override
+    protected PgSqlMessage decodeRequest(ByteBuff payload) throws IOException {
+        return decode(payload, decoderClient);
+    }
+
+    @Override
+    protected PgSqlMessage decodeResponse(ByteBuff payload) throws IOException {
+        return decode(payload, decoderServer);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcPgSqlPayloadWriter.java	Fri Apr 10 12:47:48 2020 +0200
@@ -0,0 +1,13 @@
+package com.passus.st.reader.nc;
+
+import com.passus.net.pgsql.PgSqlEncoder;
+import com.passus.net.pgsql.PgSqlMessage;
+import com.passus.st.source.AbstractNcPayloadWriter;
+
+public class NcPgSqlPayloadWriter extends AbstractNcPayloadWriter<PgSqlMessage, PgSqlMessage> {
+
+    public NcPgSqlPayloadWriter() {
+        super(new PgSqlEncoder(), new PgSqlEncoder());
+
+    }
+}
\ No newline at end of file
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java	Fri Apr 10 10:22:01 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java	Fri Apr 10 12:47:48 2020 +0200
@@ -9,10 +9,7 @@
 import com.passus.st.client.Event;
 import com.passus.st.client.SessionPayloadEvent;
 import com.passus.st.client.SessionStatusEvent;
-import com.passus.st.reader.nc.NcDataBlockWriter;
-import com.passus.st.reader.nc.NcDnsPayloadWriter;
-import com.passus.st.reader.nc.NcHttpPayloadWriter;
-import com.passus.st.reader.nc.NcNetflowPayloadWriter;
+import com.passus.st.reader.nc.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -40,6 +37,8 @@
 
     private final NcNetflowPayloadWriter netflowWriter = new NcNetflowPayloadWriter();
 
+    private final NcPgSqlPayloadWriter pgSqlWriter = new NcPgSqlPayloadWriter();
+
     private boolean allowOverwrite;
 
     public NcEventDestination() {
@@ -132,6 +131,9 @@
             case NETFLOW:
                 netflowWriter.write(event, writer);
                 break;
+            case PGSQL:
+                pgSqlWriter.write(event, writer);
+                break;
         }
     }
 
@@ -156,7 +158,6 @@
         } catch (Exception e) {
             throw new RuntimeException(e.getMessage(), e);
         }
-
     }
 
 }
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Fri Apr 10 10:22:01 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Fri Apr 10 12:47:48 2020 +0200
@@ -58,6 +58,8 @@
 
     private NcDnsPayloadReader dnsReader = new NcDnsPayloadReader();
 
+    private NcPgSqlPayloadReader pgSqlReader = new NcPgSqlPayloadReader();
+
     private String name = UniqueIdGenerator.generate();
 
     private ReaderThread readerThread;
@@ -253,6 +255,9 @@
                     case NETFLOW:
                         messages = netflowReader.read(payload);
                         break;
+                    case PGSQL:
+                        messages = pgSqlReader.read(payload);
+                        break;
                     default:
                         if (LOGGER.isDebugEnabled()) {
                             LOGGER.debug("Not supported protocol {}.", payloadBlock.proto());
--- a/stress-tester/src/main/java/com/passus/st/utils/EventUtils.java	Fri Apr 10 10:22:01 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/utils/EventUtils.java	Fri Apr 10 12:47:48 2020 +0200
@@ -6,6 +6,7 @@
 import com.passus.net.dns.session.DnsUdpSessionAnalyzer;
 import com.passus.net.http.session.HttpSessionAnalyzer;
 import com.passus.net.netflow.NetflowUdpSessionAnalyzer;
+import com.passus.net.pgsql.PgSqlTcpSessionAnalyzer;
 import com.passus.net.session.SessionAnalyzer;
 import com.passus.st.Protocols;
 import com.passus.st.client.ArrayListEventHandler;
@@ -57,28 +58,27 @@
             src.setAllowPartialSession(allowPartialSession);
             src.setPcapFile(pcapFile.getAbsolutePath());
 
-            SessionAnalyzer analyzer;
             if (protocols.contains(Protocols.HTTP)) {
-                analyzer = new HttpSessionAnalyzer();
-                if (portsRange != null) {
-                    analyzer.getPortsRange().addAll(portsRange);
-                }
+                SessionAnalyzer analyzer = new HttpSessionAnalyzer();
+                analyzer.getPortsRange().add(80).add(8080).add(4214);
                 src.addAnalyzer(analyzer);
             }
 
             if (protocols.contains(Protocols.DNS)) {
-                analyzer = new DnsUdpSessionAnalyzer();
-                if (portsRange != null) {
-                    analyzer.getPortsRange().addAll(portsRange);
-                }
+                SessionAnalyzer analyzer = new DnsUdpSessionAnalyzer();
+                analyzer.getPortsRange().add(53);
                 src.addAnalyzer(analyzer);
             }
 
             if (protocols.contains(Protocols.NETFLOW)) {
-                analyzer = new NetflowUdpSessionAnalyzer();
-                if (portsRange != null) {
-                    analyzer.getPortsRange().addAll(portsRange);
-                }
+                SessionAnalyzer analyzer = new NetflowUdpSessionAnalyzer();
+                analyzer.getPortsRange().add(2055);
+                src.addAnalyzer(analyzer);
+            }
+
+            if (protocols.contains(Protocols.PGSQL)) {
+                SessionAnalyzer analyzer = new PgSqlTcpSessionAnalyzer();
+                analyzer.getPortsRange().add(5432);
                 src.addAnalyzer(analyzer);
             }
 
--- a/stress-tester/src/test/java/com/passus/st/client/AbstractFlowWorkerTest.java	Fri Apr 10 10:22:01 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/AbstractFlowWorkerTest.java	Fri Apr 10 12:47:48 2020 +0200
@@ -25,6 +25,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static com.passus.st.Protocols.HTTP;
 import static com.passus.st.utils.Assert.assertHttpClientEvents;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertFalse;
@@ -281,7 +282,7 @@
         events.forEach(worker::handle);
         join(worker);
 
-        assertHttpClientEvents(events, listener.events());
+        assertHttpClientEvents(events, listener.events(), HTTP);
     }
 
     @Test
@@ -292,7 +293,7 @@
         worker.start();
         events.forEach(worker::handle);
         join(worker);
-        assertHttpClientEvents(events, listener.events());
+        assertHttpClientEvents(events, listener.events(), HTTP);
     }
 
     @Test
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Fri Apr 10 10:22:01 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Fri Apr 10 12:47:48 2020 +0200
@@ -2,6 +2,7 @@
 
 import com.passus.data.PooledByteBuffAllocator;
 import com.passus.net.PortRangeSet;
+import com.passus.st.Log4jConfigurationFactory;
 import com.passus.st.Protocols;
 import com.passus.st.client.ArrayListEventHandler;
 import com.passus.st.client.Event;
@@ -16,6 +17,7 @@
 import java.util.Map;
 
 import static com.passus.commons.utils.ResourceUtils.createTmpFile;
+import static com.passus.st.Protocols.*;
 import static com.passus.st.utils.Assert.assertEquals;
 import static com.passus.st.utils.Assert.assertEvents;
 
@@ -24,9 +26,13 @@
  */
 public class NcEventSourceTest {
 
+    static {
+        Log4jConfigurationFactory.enableFactory("trace");
+    }
+
     private FileEvents writeEvents(String pcapFile) throws IOException {
         PortRangeSet portRanges = new PortRangeSet();
-        portRanges.add(4214).add(53).add(2055);
+        portRanges.add(4214).add(8080).add(53).add(2055).add(5432);
         Map<String, Object> props = new HashMap<>();
         props.put("allowPartialSession", true);
         props.put("ports", portRanges);
@@ -45,18 +51,19 @@
     @DataProvider(name = "pcapFiles")
     public Object[][] pcapFiles() {
         return new Object[][]{
-                {"pcap/http/http_req_resp.pcap"},
-                {"pcap/http/http_ndiag_tcp_conn.pcap"},
-                {"pcap/http/basic_digest.pcap"},
-                {"pcap/http/http_get_png.pcap"},
-                {"pcap/http/http_1.pcap"},
-                {"pcap/dns/dns_A_req_resp.pcap"},
-                {"pcap/netflow/netflow_v5.pcap"}
+                {"pcap/http/http_req_resp.pcap", HTTP},
+                {"pcap/http/http_ndiag_tcp_conn.pcap", HTTP},
+                {"pcap/http/basic_digest.pcap", HTTP},
+                {"pcap/http/http_get_png.pcap", HTTP},
+                {"pcap/http/http_1.pcap", HTTP},
+                {"pcap/dns/dns_A_req_resp.pcap", DNS},
+                {"pcap/netflow/netflow_v5.pcap", NETFLOW},
+                {"pcap/pgsql/pgsql_session.pcap", PGSQL}
         };
     }
 
     @Test(dataProvider = "pcapFiles")
-    public void testWriteAndRead(String pcapFile) throws Exception {
+    public void testWriteAndRead(String pcapFile, int proto) throws Exception {
         FileEvents fileEvents = writeEvents(pcapFile);
         try {
             ArrayListEventHandler handler = new ArrayListEventHandler();
@@ -66,7 +73,7 @@
             eventSource.start();
 
             List<Event> events = handler.getEvents();
-            assertEvents(fileEvents.events, events);
+            assertEvents(fileEvents.events, events, proto);
         } finally {
             fileEvents.ncFile.delete();
         }
@@ -85,7 +92,7 @@
             eventSource.start();
 
             List<Event> events = handler.getEvents();
-            assertEvents(fileEvents.events, events);
+            assertEvents(fileEvents.events, events, HTTP);
             assertEquals(2, allocator.usedSize());
         } finally {
             fileEvents.ncFile.delete();
--- a/stress-tester/src/test/java/com/passus/st/utils/Assert.java	Fri Apr 10 10:22:01 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/utils/Assert.java	Fri Apr 10 12:47:48 2020 +0200
@@ -40,7 +40,7 @@
         assertEquals(null, expectedBs, actual);
     }
 
-    public static void assertHttpClientEvents(List<Event> expectedEvents, Collection<HttpClientEvent> httpClientEvents) {
+    public static void assertHttpClientEvents(List<Event> expectedEvents, Collection<HttpClientEvent> httpClientEvents, int expectedProtocolId) {
         expectedEvents = expectedEvents.stream()
                 .filter((e) -> e.getType() == SessionPayloadEvent.TYPE && ((SessionPayloadEvent) e).getProtocolId() == HTTP)
                 .collect(Collectors.toList());
@@ -54,21 +54,27 @@
             }
         });
 
-        assertEvents(expectedEvents, events);
+        assertEvents(expectedEvents, events, expectedProtocolId);
     }
 
-    public static void assertEvents(List<Event> expectedEvents, List<Event> events) {
+    public static void assertEvents(List<Event> expectedEvents, List<Event> events, int expectedProtocolId) {
         assertEquals(expectedEvents.size(), events.size());
+
+        int payloadEvents = 0;
         for (int i = 0; i < events.size(); i++) {
             Event expectedEvent = expectedEvents.get(i);
             Event event = events.get(i);
 
             assertEquals(expectedEvent.getType(), event.getType());
             if (event.getType() == SessionPayloadEvent.TYPE) {
+                payloadEvents++;
                 SessionPayloadEvent expectedPayloadEvent = (SessionPayloadEvent) expectedEvent;
                 SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event;
 
-                if(((SessionPayloadEvent) event).getProtocolId() == HTTP) {
+                int protocolId = ((SessionPayloadEvent) event).getProtocolId();
+                assertEquals(expectedProtocolId, protocolId);
+
+                if (((SessionPayloadEvent) event).getProtocolId() == HTTP) {
                     assertMessages(expectedPayloadEvent.getRequest(), payloadEvent.getRequest());
                     assertMessagesContent(expectedPayloadEvent.getRequest(), payloadEvent.getRequest());
                     assertMessages(expectedPayloadEvent.getResponse(), payloadEvent.getResponse());
@@ -79,6 +85,8 @@
                 }
             }
         }
+
+        assertTrue("No session payload events.", payloadEvents > 0);
     }
 
     public static void assertNoError(Errors errors) {
--- a/stress-tester/src/test/java/com/passus/st/utils/EventUtils.java	Fri Apr 10 10:22:01 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,109 +0,0 @@
-package com.passus.st.utils;
-
-import com.passus.commons.utils.ArrayUtils;
-import com.passus.commons.utils.ResourceUtils;
-import com.passus.net.PortRangeSet;
-import com.passus.net.dns.session.DnsUdpSessionAnalyzer;
-import com.passus.net.http.session.HttpSessionAnalyzer;
-import com.passus.net.netflow.NetflowUdpSessionAnalyzer;
-import com.passus.net.session.SessionAnalyzer;
-import com.passus.st.Protocols;
-import com.passus.st.client.ArrayListEventHandler;
-import com.passus.st.client.Event;
-import com.passus.st.source.PcapSessionEventSource;
-
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * @author Mirosław Hawrot
- */
-public class EventUtils {
-
-    private EventUtils() {
-    }
-
-    public static List<Event> readEvents(String pcapRelativeFile) {
-        return readEvents(pcapRelativeFile, null);
-    }
-
-    public static List<Event> readEvents(String pcapRelativeFile, Map<String, Object> props) {
-        try {
-            boolean allowPartialSession = false;
-            Set<Integer> protocols = ArrayUtils.asSet(Protocols.HTTP);
-            PortRangeSet portsRange = null;
-            if (props != null) {
-                allowPartialSession = (boolean) props.getOrDefault("allowPartialSession", false);
-
-                Object ports = props.getOrDefault("ports", null);
-                if (ports instanceof PortRangeSet) {
-                    portsRange = (PortRangeSet) ports;
-                } else if (ports instanceof Integer) {
-                    portsRange = new PortRangeSet();
-                    portsRange.add((Integer) ports);
-                }
-
-                protocols = (Set<Integer>) props.getOrDefault("protocols", ArrayUtils.asSet(Protocols.HTTP));
-            }
-
-            File pcapFile = ResourceUtils.getFile(pcapRelativeFile);
-            PcapSessionEventSource src = new PcapSessionEventSource();
-            src.setAllowPartialSession(allowPartialSession);
-            src.setPcapFile(pcapFile.getAbsolutePath());
-
-            SessionAnalyzer analyzer;
-            if (protocols.contains(Protocols.HTTP)) {
-                analyzer = new HttpSessionAnalyzer();
-                if (portsRange != null) {
-                    analyzer.getPortsRange().addAll(portsRange);
-                }
-                src.addAnalyzer(analyzer);
-            }
-
-            if (protocols.contains(Protocols.DNS)) {
-                analyzer = new DnsUdpSessionAnalyzer();
-                if (portsRange != null) {
-                    analyzer.getPortsRange().addAll(portsRange);
-                }
-                src.addAnalyzer(analyzer);
-            }
-
-            if (protocols.contains(Protocols.NETFLOW)) {
-                analyzer = new NetflowUdpSessionAnalyzer();
-                if (portsRange != null) {
-                    analyzer.getPortsRange().addAll(portsRange);
-                }
-                src.addAnalyzer(analyzer);
-            }
-
-            src.setLoops(1);
-
-            ArrayListEventHandler handler = new ArrayListEventHandler();
-            src.setHandler(handler);
-
-            src.start();
-            waitForSource(src);
-            src.stop();
-
-            return handler.getEvents();
-        } catch (Exception e) {
-            throw new RuntimeException(e.getMessage(), e);
-        }
-    }
-
-    public static void waitForSource(PcapSessionEventSource src) {
-        try {
-            Thread.sleep(200);
-        } catch (Exception e) {
-        }
-
-        while (src.isWorking()) {
-            try {
-                Thread.sleep(100);
-            } catch (Exception e) {
-            }
-        }
-    }
-}