Mercurial > stress-tester
changeset 984:bbb1dfc08cdb
PcapSessionEventSource - PgSql integration
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/Protocols.java Wed Aug 14 14:37:32 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/Protocols.java Fri Aug 16 14:22:13 2019 +0200 @@ -6,6 +6,8 @@ public static final int HTTP = 1; public static final int DNS = 2; public static final int NETFLOW = 3; + public static final int PGSQL = 4; + public static final int MYSQL = 5; private Protocols() { }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java Wed Aug 14 14:37:32 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java Fri Aug 16 14:22:13 2019 +0200 @@ -3,6 +3,7 @@ import com.passus.st.client.dns.DnsFlowHandler; import com.passus.st.client.http.HttpFlowHandler; import com.passus.st.client.netflow.NetflowFlowHandler; +import com.passus.st.client.pgsql.PgSqlFlowHandlerNetflowFlowHandler; import static com.passus.st.Protocols.*; @@ -17,6 +18,8 @@ return new DnsFlowHandler(); case NETFLOW: return new NetflowFlowHandler(); + case PGSQL: + return new PgSqlFlowHandlerNetflowFlowHandler(); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerDataDecoder.java Fri Aug 16 14:22:13 2019 +0200 @@ -0,0 +1,72 @@ +package com.passus.st.client.pgsql; + +import com.passus.data.ByteBuff; +import com.passus.data.DataDecoder; +import com.passus.net.pgsql.PgSqlDecoder; +import com.passus.net.pgsql.PgSqlMessage; +import com.passus.st.client.FlowContext; +import com.passus.st.client.FlowHandlerDataDecoder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import static com.passus.st.client.FlowUtils.debug; + +public class PgSqlFlowHandlerDataDecoder implements FlowHandlerDataDecoder<PgSqlMessage> { + + private static final Logger LOGGER = LogManager.getLogger(PgSqlFlowHandlerDataDecoder.class); + + private PgSqlDecoder decoder = new PgSqlDecoder(false); + + private final PgSqlFlowHandlerNetflowFlowHandler handler; + + public PgSqlFlowHandlerDataDecoder(PgSqlFlowHandlerNetflowFlowHandler handler) { + this.handler = handler; + } + + @Override + public PgSqlMessage getResult() { + return decoder.getResult(); + } + + @Override + public int state() { + return decoder.state(); + } + + @Override + public String getLastError() { + return decoder.getLastError(); + } + + @Override + public int decode(ByteBuff buffer, FlowContext flowContext) { + int res = decoder.decode(buffer); + if (decoder.state() == DataDecoder.STATE_FINISHED) { + long now = handler.timeGenerator.currentTimeMillis(); + + if (LOGGER.isDebugEnabled()) { + debug(LOGGER, flowContext, + "Response decoded (size: {} B, downloaded: {} ms)", + res, + now - flowContext.receivedStartTimestamp() + ); + } + + //TODO Dorobic metryki + /*if (client.isCollectMetrics()) { + synchronized (client.metric) { + client.metric.incResponsesNum(); + client.metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize()); + client.metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp()); + if (lastRequest != null) { + client.metric.addResponseTime(now - (long) lastRequest.getTag(TAG_TIME_START)); + } + } + }*/ + } + + return res; + } + + +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerDataEncoder.java Fri Aug 16 14:22:13 2019 +0200 @@ -0,0 +1,18 @@ +package com.passus.st.client.pgsql; + +import com.passus.data.ByteBuff; +import com.passus.net.pgsql.PgSql; +import com.passus.net.pgsql.PgSqlEncoder; +import com.passus.net.pgsql.PgSqlMessage; +import com.passus.st.client.FlowContext; +import com.passus.st.client.FlowHandlerDataEncoder; + +public class PgSqlFlowHandlerDataEncoder implements FlowHandlerDataEncoder<PgSqlMessage> { + + private final PgSqlEncoder encoder = new PgSqlEncoder(); + + @Override + public void encode(PgSqlMessage request, FlowContext flowContext, ByteBuff out) { + encoder.encode(request, out); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java Fri Aug 16 14:22:13 2019 +0200 @@ -0,0 +1,74 @@ +package com.passus.st.client.pgsql; + +import com.passus.commons.Assert; +import com.passus.commons.time.TimeAware; +import com.passus.commons.time.TimeGenerator; +import com.passus.st.client.FlowContext; +import com.passus.st.client.FlowHandler; +import com.passus.st.client.FlowHandlerDataDecoder; +import com.passus.st.client.FlowHandlerDataEncoder; +import com.passus.st.client.netflow.NetflowFlowHandlerDataEncoder; +import com.passus.st.metric.MetricsContainer; + +import static com.passus.st.Protocols.NETFLOW; + +public class PgSqlFlowHandlerNetflowFlowHandler implements FlowHandler, TimeAware { + + private final PgSqlFlowHandlerDataEncoder encoder; + + TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); + + boolean collectMetrics = false; + + public PgSqlFlowHandlerNetflowFlowHandler() { + this.encoder = new PgSqlFlowHandlerDataEncoder(); + } + + @Override + public int getProtocolId() { + return NETFLOW; + } + + @Override + public boolean isCollectMetrics() { + return collectMetrics; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + this.collectMetrics = collectMetrics; + } + + @Override + public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) { + return null; + } + + @Override + public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) { + return encoder; + } + + @Override + public void writeMetrics(MetricsContainer container) { + + } + + @Override + public TimeGenerator getTimeGenerator() { + return timeGenerator; + } + + @Override + public void setTimeGenerator(TimeGenerator timeGenerator) { + Assert.notNull(timeGenerator, "timeGenerator"); + this.timeGenerator = timeGenerator; + } + + @Override + public void init(FlowContext flowContext) { + flowContext.setBidirectional(false); + } + + +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapPgSqlListener.java Fri Aug 16 14:22:13 2019 +0200 @@ -0,0 +1,24 @@ +package com.passus.st.source; + +import com.passus.net.pgsql.PgSqlMessage; +import com.passus.net.session.SessionContext; +import com.passus.st.client.EventHandler; + +import static com.passus.st.Protocols.PGSQL; + +public class PcapPgSqlListener extends BaseSessionAnalyzerListener<PgSqlMessage> { + + public PcapPgSqlListener(String sourceName, EventHandler eventHandler, + boolean collectMetric, PcapSessionEventSourceMetric metric, int maxSessionNum) { + super(sourceName, eventHandler, collectMetric, metric, PGSQL); + } + + @Override + public void onMessageReceived(SessionContext context, PgSqlMessage msg, long timestamp) { + //Nie przetwarzamy odpowiedzi + if (!msg.isRequest()) { + firePayloadEvent(msg, null, context, timestamp); + } + } + +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapPgSqlSessionAnalyzerHook.java Fri Aug 16 14:22:13 2019 +0200 @@ -0,0 +1,30 @@ +package com.passus.st.source; + +import com.passus.net.pgsql.PgSqlTcpSessionAnalyzer; +import com.passus.net.session.SessionAnalyzer; + +public class PcapPgSqlSessionAnalyzerHook extends PcapSessionAnalyzerHook { + + @Override + public boolean supports(Class<? extends SessionAnalyzer> clazz) { + return PgSqlTcpSessionAnalyzer.class.isAssignableFrom(clazz); + } + + @Override + protected void doAttach(SessionAnalyzer analyzer, PcapSessionAnalyzerHookContext context) { + PcapPgSqlListener listener = new PcapPgSqlListener(context.getSourceName(), + context.getEventHandler(), + context.isCollectMetric(), + context.getMetric(), + context.getTcpProcessor().getMaxSessionNumber()); + analyzer.setListener(listener); + + context.getTcpProcessor().addAnalyzer(analyzer); + } + + @Override + protected void doDetach(SessionAnalyzer analyzer, PcapSessionAnalyzerHookContext context) { + analyzer.setListener(null); + context.getTcpProcessor().removeAnalyzer(analyzer); + } +} \ No newline at end of file
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java Wed Aug 14 14:37:32 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java Fri Aug 16 14:22:13 2019 +0200 @@ -83,6 +83,7 @@ hooks.add(new PcapHttpSessionAnalyzerHook()); hooks.add(new PcapDnsSessionAnalyzerHook()); hooks.add(new PcapNetflowSessionAnalyzerHook()); + hooks.add(new PcapPgSqlSessionAnalyzerHook()); analyzers.add(new HttpSessionAnalyzer()); }
--- a/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java Wed Aug 14 14:37:32 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java Fri Aug 16 14:22:13 2019 +0200 @@ -7,6 +7,8 @@ import com.passus.net.http.HttpResponse; import com.passus.net.netflow.Netflow5; import com.passus.net.netflow.NetflowUdpSessionAnalyzer; +import com.passus.net.pgsql.PgSqlMessage; +import com.passus.net.pgsql.PgSqlTcpSessionAnalyzer; import com.passus.st.client.ArrayListEventHandler; import com.passus.st.client.DataEvents.DataEnd; import com.passus.st.client.DataEvents.DataLoopEnd; @@ -135,6 +137,32 @@ assertTrue(handler.get(handler.size() - 1) instanceof DataEnd); } + @Test(enabled = true) + public void testProcessPgsql() throws Exception { + File pcapFile = ResourceUtils.getFile("pcap/pgsql/pgsql_session.pcap"); + PcapSessionEventSource src = new PcapSessionEventSource(); + src.addAnalyzer(new PgSqlTcpSessionAnalyzer()); + src.setName("pcapSource"); + src.setLoops(1); + src.setPcapFile(pcapFile.getAbsolutePath()); + + ArrayListEventHandler handler = new ArrayListEventHandler(); + src.setHandler(handler); + + src.start(); + waitForSource(src); + src.stop(); + + SessionPayloadEvent payloadEvent = (SessionPayloadEvent) handler.findFirst(SessionPayloadEvent.TYPE); + + assertTrue(payloadEvent.getRequest() instanceof PgSqlMessage); + assertTrue(payloadEvent.getResponse() == null); + + /*assertTrue(handler.get(handler.size() - 3) instanceof SessionStatusEvent); + assertTrue(handler.get(handler.size() - 2) instanceof DataLoopEnd); + assertTrue(handler.get(handler.size() - 1) instanceof DataEnd);*/ + } + public static void waitForSource(PcapSessionEventSource src) { try { Thread.sleep(200);