changeset 984:bbb1dfc08cdb

PcapSessionEventSource - PgSql integration
author Devel 2
date Fri, 16 Aug 2019 14:22:13 +0200
parents d3a25b8f596d
children ed255da90ec3
files stress-tester/src/main/java/com/passus/st/Protocols.java stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerDataDecoder.java stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerDataEncoder.java stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java stress-tester/src/main/java/com/passus/st/source/PcapPgSqlListener.java stress-tester/src/main/java/com/passus/st/source/PcapPgSqlSessionAnalyzerHook.java stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java stress-tester/src/test/resources/pcap/pgsql/pgsql_session.pcap
diffstat 10 files changed, 252 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/Protocols.java	Wed Aug 14 14:37:32 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/Protocols.java	Fri Aug 16 14:22:13 2019 +0200
@@ -6,6 +6,8 @@
     public static final int HTTP = 1;
     public static final int DNS = 2;
     public static final int NETFLOW = 3;
+    public static final int PGSQL = 4;
+    public static final int MYSQL = 5;
 
     private Protocols() {
     }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java	Wed Aug 14 14:37:32 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java	Fri Aug 16 14:22:13 2019 +0200
@@ -3,6 +3,7 @@
 import com.passus.st.client.dns.DnsFlowHandler;
 import com.passus.st.client.http.HttpFlowHandler;
 import com.passus.st.client.netflow.NetflowFlowHandler;
+import com.passus.st.client.pgsql.PgSqlFlowHandlerNetflowFlowHandler;
 
 import static com.passus.st.Protocols.*;
 
@@ -17,6 +18,8 @@
                 return new DnsFlowHandler();
             case NETFLOW:
                 return new NetflowFlowHandler();
+            case PGSQL:
+                return new PgSqlFlowHandlerNetflowFlowHandler();
 
         }
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerDataDecoder.java	Fri Aug 16 14:22:13 2019 +0200
@@ -0,0 +1,72 @@
+package com.passus.st.client.pgsql;
+
+import com.passus.data.ByteBuff;
+import com.passus.data.DataDecoder;
+import com.passus.net.pgsql.PgSqlDecoder;
+import com.passus.net.pgsql.PgSqlMessage;
+import com.passus.st.client.FlowContext;
+import com.passus.st.client.FlowHandlerDataDecoder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import static com.passus.st.client.FlowUtils.debug;
+
+public class PgSqlFlowHandlerDataDecoder implements FlowHandlerDataDecoder<PgSqlMessage> {
+
+    private static final Logger LOGGER = LogManager.getLogger(PgSqlFlowHandlerDataDecoder.class);
+
+    private PgSqlDecoder decoder = new PgSqlDecoder(false);
+
+    private final PgSqlFlowHandlerNetflowFlowHandler handler;
+
+    public PgSqlFlowHandlerDataDecoder(PgSqlFlowHandlerNetflowFlowHandler handler) {
+        this.handler = handler;
+    }
+
+    @Override
+    public PgSqlMessage getResult() {
+        return decoder.getResult();
+    }
+
+    @Override
+    public int state() {
+        return decoder.state();
+    }
+
+    @Override
+    public String getLastError() {
+        return decoder.getLastError();
+    }
+
+    @Override
+    public int decode(ByteBuff buffer, FlowContext flowContext) {
+        int res = decoder.decode(buffer);
+        if (decoder.state() == DataDecoder.STATE_FINISHED) {
+            long now = handler.timeGenerator.currentTimeMillis();
+
+            if (LOGGER.isDebugEnabled()) {
+                debug(LOGGER, flowContext,
+                        "Response decoded (size: {} B, downloaded: {} ms)",
+                        res,
+                        now - flowContext.receivedStartTimestamp()
+                );
+            }
+
+            //TODO Dorobic metryki
+            /*if (client.isCollectMetrics()) {
+                synchronized (client.metric) {
+                    client.metric.incResponsesNum();
+                    client.metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize());
+                    client.metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp());
+                    if (lastRequest != null) {
+                        client.metric.addResponseTime(now - (long) lastRequest.getTag(TAG_TIME_START));
+                    }
+                }
+            }*/
+        }
+
+        return res;
+    }
+
+
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerDataEncoder.java	Fri Aug 16 14:22:13 2019 +0200
@@ -0,0 +1,18 @@
+package com.passus.st.client.pgsql;
+
+import com.passus.data.ByteBuff;
+import com.passus.net.pgsql.PgSql;
+import com.passus.net.pgsql.PgSqlEncoder;
+import com.passus.net.pgsql.PgSqlMessage;
+import com.passus.st.client.FlowContext;
+import com.passus.st.client.FlowHandlerDataEncoder;
+
+public class PgSqlFlowHandlerDataEncoder implements FlowHandlerDataEncoder<PgSqlMessage> {
+
+    private final PgSqlEncoder encoder = new PgSqlEncoder();
+
+    @Override
+    public void encode(PgSqlMessage request, FlowContext flowContext, ByteBuff out) {
+        encoder.encode(request, out);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java	Fri Aug 16 14:22:13 2019 +0200
@@ -0,0 +1,74 @@
+package com.passus.st.client.pgsql;
+
+import com.passus.commons.Assert;
+import com.passus.commons.time.TimeAware;
+import com.passus.commons.time.TimeGenerator;
+import com.passus.st.client.FlowContext;
+import com.passus.st.client.FlowHandler;
+import com.passus.st.client.FlowHandlerDataDecoder;
+import com.passus.st.client.FlowHandlerDataEncoder;
+import com.passus.st.client.netflow.NetflowFlowHandlerDataEncoder;
+import com.passus.st.metric.MetricsContainer;
+
+import static com.passus.st.Protocols.NETFLOW;
+
+public class PgSqlFlowHandlerNetflowFlowHandler implements FlowHandler, TimeAware {
+
+    private final PgSqlFlowHandlerDataEncoder encoder;
+
+    TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
+
+    boolean collectMetrics = false;
+
+    public PgSqlFlowHandlerNetflowFlowHandler() {
+        this.encoder = new PgSqlFlowHandlerDataEncoder();
+    }
+
+    @Override
+    public int getProtocolId() {
+        return NETFLOW;
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        this.collectMetrics = collectMetrics;
+    }
+
+    @Override
+    public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) {
+        return null;
+    }
+
+    @Override
+    public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) {
+        return encoder;
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+
+    }
+
+    @Override
+    public TimeGenerator getTimeGenerator() {
+        return timeGenerator;
+    }
+
+    @Override
+    public void setTimeGenerator(TimeGenerator timeGenerator) {
+        Assert.notNull(timeGenerator, "timeGenerator");
+        this.timeGenerator = timeGenerator;
+    }
+
+    @Override
+    public void init(FlowContext flowContext) {
+        flowContext.setBidirectional(false);
+    }
+
+
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapPgSqlListener.java	Fri Aug 16 14:22:13 2019 +0200
@@ -0,0 +1,24 @@
+package com.passus.st.source;
+
+import com.passus.net.pgsql.PgSqlMessage;
+import com.passus.net.session.SessionContext;
+import com.passus.st.client.EventHandler;
+
+import static com.passus.st.Protocols.PGSQL;
+
+public class PcapPgSqlListener extends BaseSessionAnalyzerListener<PgSqlMessage> {
+
+    public PcapPgSqlListener(String sourceName, EventHandler eventHandler,
+                             boolean collectMetric, PcapSessionEventSourceMetric metric, int maxSessionNum) {
+        super(sourceName, eventHandler, collectMetric, metric, PGSQL);
+    }
+
+    @Override
+    public void onMessageReceived(SessionContext context, PgSqlMessage msg, long timestamp) {
+        //Nie przetwarzamy odpowiedzi
+        if (!msg.isRequest()) {
+            firePayloadEvent(msg, null, context, timestamp);
+        }
+    }
+
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapPgSqlSessionAnalyzerHook.java	Fri Aug 16 14:22:13 2019 +0200
@@ -0,0 +1,30 @@
+package com.passus.st.source;
+
+import com.passus.net.pgsql.PgSqlTcpSessionAnalyzer;
+import com.passus.net.session.SessionAnalyzer;
+
+public class PcapPgSqlSessionAnalyzerHook extends PcapSessionAnalyzerHook {
+
+    @Override
+    public boolean supports(Class<? extends SessionAnalyzer> clazz) {
+        return PgSqlTcpSessionAnalyzer.class.isAssignableFrom(clazz);
+    }
+
+    @Override
+    protected void doAttach(SessionAnalyzer analyzer, PcapSessionAnalyzerHookContext context) {
+        PcapPgSqlListener listener = new PcapPgSqlListener(context.getSourceName(),
+                context.getEventHandler(),
+                context.isCollectMetric(),
+                context.getMetric(),
+                context.getTcpProcessor().getMaxSessionNumber());
+        analyzer.setListener(listener);
+
+        context.getTcpProcessor().addAnalyzer(analyzer);
+    }
+
+    @Override
+    protected void doDetach(SessionAnalyzer analyzer, PcapSessionAnalyzerHookContext context) {
+        analyzer.setListener(null);
+        context.getTcpProcessor().removeAnalyzer(analyzer);
+    }
+}
\ No newline at end of file
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java	Wed Aug 14 14:37:32 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java	Fri Aug 16 14:22:13 2019 +0200
@@ -83,6 +83,7 @@
         hooks.add(new PcapHttpSessionAnalyzerHook());
         hooks.add(new PcapDnsSessionAnalyzerHook());
         hooks.add(new PcapNetflowSessionAnalyzerHook());
+        hooks.add(new PcapPgSqlSessionAnalyzerHook());
 
         analyzers.add(new HttpSessionAnalyzer());
     }
--- a/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java	Wed Aug 14 14:37:32 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java	Fri Aug 16 14:22:13 2019 +0200
@@ -7,6 +7,8 @@
 import com.passus.net.http.HttpResponse;
 import com.passus.net.netflow.Netflow5;
 import com.passus.net.netflow.NetflowUdpSessionAnalyzer;
+import com.passus.net.pgsql.PgSqlMessage;
+import com.passus.net.pgsql.PgSqlTcpSessionAnalyzer;
 import com.passus.st.client.ArrayListEventHandler;
 import com.passus.st.client.DataEvents.DataEnd;
 import com.passus.st.client.DataEvents.DataLoopEnd;
@@ -135,6 +137,32 @@
         assertTrue(handler.get(handler.size() - 1) instanceof DataEnd);
     }
 
+    @Test(enabled = true)
+    public void testProcessPgsql() throws Exception {
+        File pcapFile = ResourceUtils.getFile("pcap/pgsql/pgsql_session.pcap");
+        PcapSessionEventSource src = new PcapSessionEventSource();
+        src.addAnalyzer(new PgSqlTcpSessionAnalyzer());
+        src.setName("pcapSource");
+        src.setLoops(1);
+        src.setPcapFile(pcapFile.getAbsolutePath());
+
+        ArrayListEventHandler handler = new ArrayListEventHandler();
+        src.setHandler(handler);
+
+        src.start();
+        waitForSource(src);
+        src.stop();
+
+        SessionPayloadEvent payloadEvent = (SessionPayloadEvent) handler.findFirst(SessionPayloadEvent.TYPE);
+
+        assertTrue(payloadEvent.getRequest() instanceof PgSqlMessage);
+        assertTrue(payloadEvent.getResponse() == null);
+
+        /*assertTrue(handler.get(handler.size() - 3) instanceof SessionStatusEvent);
+        assertTrue(handler.get(handler.size() - 2) instanceof DataLoopEnd);
+        assertTrue(handler.get(handler.size() - 1) instanceof DataEnd);*/
+    }
+
     public static void waitForSource(PcapSessionEventSource src) {
         try {
             Thread.sleep(200);
Binary file stress-tester/src/test/resources/pcap/pgsql/pgsql_session.pcap has changed