Mercurial > stress-tester
changeset 1039:58860e741b9e
NcEventSource, NcEventDestination - PgSql support
author | Devel 2 |
---|---|
date | Fri, 10 Apr 2020 12:47:48 +0200 |
parents | 9dff4810b5b8 |
children | bff9be223040 |
files | stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpDataReader.java stress-tester/src/main/java/com/passus/st/reader/nc/NcPgSqlPayloadReader.java stress-tester/src/main/java/com/passus/st/reader/nc/NcPgSqlPayloadWriter.java stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java stress-tester/src/main/java/com/passus/st/source/NcEventSource.java stress-tester/src/main/java/com/passus/st/utils/EventUtils.java stress-tester/src/test/java/com/passus/st/client/AbstractFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java stress-tester/src/test/java/com/passus/st/utils/Assert.java stress-tester/src/test/java/com/passus/st/utils/EventUtils.java |
diffstat | 10 files changed, 109 insertions(+), 161 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpDataReader.java Fri Apr 10 10:22:01 2020 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,17 +0,0 @@ -package com.passus.st.reader.nc; - -import com.passus.net.http.HttpMessage; -import com.passus.st.reader.DataBlockReader; -import java.io.IOException; - -/** - * - * @author Mirosław Hawrot - */ -public class NcHttpDataReader { - - public HttpMessage read(DataBlockReader<NcDataBlock> reader) throws IOException { - throw new UnsupportedOperationException("Not supported yet."); - } - -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcPgSqlPayloadReader.java Fri Apr 10 12:47:48 2020 +0200 @@ -0,0 +1,39 @@ +package com.passus.st.reader.nc; + +import com.passus.data.ByteBuff; +import com.passus.data.DataDecoder; +import com.passus.net.pgsql.PgSqlDecoder; +import com.passus.net.pgsql.PgSqlMessage; +import com.passus.st.source.NcPayloadReader; + +import java.io.IOException; + +public class NcPgSqlPayloadReader extends NcPayloadReader<PgSqlMessage, PgSqlMessage> { + + private final PgSqlDecoder decoderClient = new PgSqlDecoder(true); + + private final PgSqlDecoder decoderServer = new PgSqlDecoder(); + + private PgSqlMessage decode(ByteBuff payload, PgSqlDecoder decoder) throws IOException { + try { + int res = decoder.decode(payload); + payload.skipBytes(res); + if (decoder.state() == DataDecoder.STATE_FINISHED) { + return decoder.getResult(); + } + throw new IOException("Invalid PgSql data."); + } finally { + decoder.clear(); + } + } + + @Override + protected PgSqlMessage decodeRequest(ByteBuff payload) throws IOException { + return decode(payload, decoderClient); + } + + @Override + protected PgSqlMessage decodeResponse(ByteBuff payload) throws IOException { + return decode(payload, decoderServer); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcPgSqlPayloadWriter.java Fri Apr 10 12:47:48 2020 +0200 @@ -0,0 +1,13 @@ +package com.passus.st.reader.nc; + +import com.passus.net.pgsql.PgSqlEncoder; +import com.passus.net.pgsql.PgSqlMessage; +import com.passus.st.source.AbstractNcPayloadWriter; + +public class NcPgSqlPayloadWriter extends AbstractNcPayloadWriter<PgSqlMessage, PgSqlMessage> { + + public NcPgSqlPayloadWriter() { + super(new PgSqlEncoder(), new PgSqlEncoder()); + + } +} \ No newline at end of file
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java Fri Apr 10 10:22:01 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java Fri Apr 10 12:47:48 2020 +0200 @@ -9,10 +9,7 @@ import com.passus.st.client.Event; import com.passus.st.client.SessionPayloadEvent; import com.passus.st.client.SessionStatusEvent; -import com.passus.st.reader.nc.NcDataBlockWriter; -import com.passus.st.reader.nc.NcDnsPayloadWriter; -import com.passus.st.reader.nc.NcHttpPayloadWriter; -import com.passus.st.reader.nc.NcNetflowPayloadWriter; +import com.passus.st.reader.nc.*; import java.io.File; import java.io.IOException; @@ -40,6 +37,8 @@ private final NcNetflowPayloadWriter netflowWriter = new NcNetflowPayloadWriter(); + private final NcPgSqlPayloadWriter pgSqlWriter = new NcPgSqlPayloadWriter(); + private boolean allowOverwrite; public NcEventDestination() { @@ -132,6 +131,9 @@ case NETFLOW: netflowWriter.write(event, writer); break; + case PGSQL: + pgSqlWriter.write(event, writer); + break; } } @@ -156,7 +158,6 @@ } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } - } }
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java Fri Apr 10 10:22:01 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java Fri Apr 10 12:47:48 2020 +0200 @@ -58,6 +58,8 @@ private NcDnsPayloadReader dnsReader = new NcDnsPayloadReader(); + private NcPgSqlPayloadReader pgSqlReader = new NcPgSqlPayloadReader(); + private String name = UniqueIdGenerator.generate(); private ReaderThread readerThread; @@ -253,6 +255,9 @@ case NETFLOW: messages = netflowReader.read(payload); break; + case PGSQL: + messages = pgSqlReader.read(payload); + break; default: if (LOGGER.isDebugEnabled()) { LOGGER.debug("Not supported protocol {}.", payloadBlock.proto());
--- a/stress-tester/src/main/java/com/passus/st/utils/EventUtils.java Fri Apr 10 10:22:01 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/utils/EventUtils.java Fri Apr 10 12:47:48 2020 +0200 @@ -6,6 +6,7 @@ import com.passus.net.dns.session.DnsUdpSessionAnalyzer; import com.passus.net.http.session.HttpSessionAnalyzer; import com.passus.net.netflow.NetflowUdpSessionAnalyzer; +import com.passus.net.pgsql.PgSqlTcpSessionAnalyzer; import com.passus.net.session.SessionAnalyzer; import com.passus.st.Protocols; import com.passus.st.client.ArrayListEventHandler; @@ -57,28 +58,27 @@ src.setAllowPartialSession(allowPartialSession); src.setPcapFile(pcapFile.getAbsolutePath()); - SessionAnalyzer analyzer; if (protocols.contains(Protocols.HTTP)) { - analyzer = new HttpSessionAnalyzer(); - if (portsRange != null) { - analyzer.getPortsRange().addAll(portsRange); - } + SessionAnalyzer analyzer = new HttpSessionAnalyzer(); + analyzer.getPortsRange().add(80).add(8080).add(4214); src.addAnalyzer(analyzer); } if (protocols.contains(Protocols.DNS)) { - analyzer = new DnsUdpSessionAnalyzer(); - if (portsRange != null) { - analyzer.getPortsRange().addAll(portsRange); - } + SessionAnalyzer analyzer = new DnsUdpSessionAnalyzer(); + analyzer.getPortsRange().add(53); src.addAnalyzer(analyzer); } if (protocols.contains(Protocols.NETFLOW)) { - analyzer = new NetflowUdpSessionAnalyzer(); - if (portsRange != null) { - analyzer.getPortsRange().addAll(portsRange); - } + SessionAnalyzer analyzer = new NetflowUdpSessionAnalyzer(); + analyzer.getPortsRange().add(2055); + src.addAnalyzer(analyzer); + } + + if (protocols.contains(Protocols.PGSQL)) { + SessionAnalyzer analyzer = new PgSqlTcpSessionAnalyzer(); + analyzer.getPortsRange().add(5432); src.addAnalyzer(analyzer); }
--- a/stress-tester/src/test/java/com/passus/st/client/AbstractFlowWorkerTest.java Fri Apr 10 10:22:01 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/AbstractFlowWorkerTest.java Fri Apr 10 12:47:48 2020 +0200 @@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static com.passus.st.Protocols.HTTP; import static com.passus.st.utils.Assert.assertHttpClientEvents; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertFalse; @@ -281,7 +282,7 @@ events.forEach(worker::handle); join(worker); - assertHttpClientEvents(events, listener.events()); + assertHttpClientEvents(events, listener.events(), HTTP); } @Test @@ -292,7 +293,7 @@ worker.start(); events.forEach(worker::handle); join(worker); - assertHttpClientEvents(events, listener.events()); + assertHttpClientEvents(events, listener.events(), HTTP); } @Test
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java Fri Apr 10 10:22:01 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java Fri Apr 10 12:47:48 2020 +0200 @@ -2,6 +2,7 @@ import com.passus.data.PooledByteBuffAllocator; import com.passus.net.PortRangeSet; +import com.passus.st.Log4jConfigurationFactory; import com.passus.st.Protocols; import com.passus.st.client.ArrayListEventHandler; import com.passus.st.client.Event; @@ -16,6 +17,7 @@ import java.util.Map; import static com.passus.commons.utils.ResourceUtils.createTmpFile; +import static com.passus.st.Protocols.*; import static com.passus.st.utils.Assert.assertEquals; import static com.passus.st.utils.Assert.assertEvents; @@ -24,9 +26,13 @@ */ public class NcEventSourceTest { + static { + Log4jConfigurationFactory.enableFactory("trace"); + } + private FileEvents writeEvents(String pcapFile) throws IOException { PortRangeSet portRanges = new PortRangeSet(); - portRanges.add(4214).add(53).add(2055); + portRanges.add(4214).add(8080).add(53).add(2055).add(5432); Map<String, Object> props = new HashMap<>(); props.put("allowPartialSession", true); props.put("ports", portRanges); @@ -45,18 +51,19 @@ @DataProvider(name = "pcapFiles") public Object[][] pcapFiles() { return new Object[][]{ - {"pcap/http/http_req_resp.pcap"}, - {"pcap/http/http_ndiag_tcp_conn.pcap"}, - {"pcap/http/basic_digest.pcap"}, - {"pcap/http/http_get_png.pcap"}, - {"pcap/http/http_1.pcap"}, - {"pcap/dns/dns_A_req_resp.pcap"}, - {"pcap/netflow/netflow_v5.pcap"} + {"pcap/http/http_req_resp.pcap", HTTP}, + {"pcap/http/http_ndiag_tcp_conn.pcap", HTTP}, + {"pcap/http/basic_digest.pcap", HTTP}, + {"pcap/http/http_get_png.pcap", HTTP}, + {"pcap/http/http_1.pcap", HTTP}, + {"pcap/dns/dns_A_req_resp.pcap", DNS}, + {"pcap/netflow/netflow_v5.pcap", NETFLOW}, + {"pcap/pgsql/pgsql_session.pcap", PGSQL} }; } @Test(dataProvider = "pcapFiles") - public void testWriteAndRead(String pcapFile) throws Exception { + public void testWriteAndRead(String pcapFile, int proto) throws Exception { FileEvents fileEvents = writeEvents(pcapFile); try { ArrayListEventHandler handler = new ArrayListEventHandler(); @@ -66,7 +73,7 @@ eventSource.start(); List<Event> events = handler.getEvents(); - assertEvents(fileEvents.events, events); + assertEvents(fileEvents.events, events, proto); } finally { fileEvents.ncFile.delete(); } @@ -85,7 +92,7 @@ eventSource.start(); List<Event> events = handler.getEvents(); - assertEvents(fileEvents.events, events); + assertEvents(fileEvents.events, events, HTTP); assertEquals(2, allocator.usedSize()); } finally { fileEvents.ncFile.delete();
--- a/stress-tester/src/test/java/com/passus/st/utils/Assert.java Fri Apr 10 10:22:01 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/utils/Assert.java Fri Apr 10 12:47:48 2020 +0200 @@ -40,7 +40,7 @@ assertEquals(null, expectedBs, actual); } - public static void assertHttpClientEvents(List<Event> expectedEvents, Collection<HttpClientEvent> httpClientEvents) { + public static void assertHttpClientEvents(List<Event> expectedEvents, Collection<HttpClientEvent> httpClientEvents, int expectedProtocolId) { expectedEvents = expectedEvents.stream() .filter((e) -> e.getType() == SessionPayloadEvent.TYPE && ((SessionPayloadEvent) e).getProtocolId() == HTTP) .collect(Collectors.toList()); @@ -54,21 +54,27 @@ } }); - assertEvents(expectedEvents, events); + assertEvents(expectedEvents, events, expectedProtocolId); } - public static void assertEvents(List<Event> expectedEvents, List<Event> events) { + public static void assertEvents(List<Event> expectedEvents, List<Event> events, int expectedProtocolId) { assertEquals(expectedEvents.size(), events.size()); + + int payloadEvents = 0; for (int i = 0; i < events.size(); i++) { Event expectedEvent = expectedEvents.get(i); Event event = events.get(i); assertEquals(expectedEvent.getType(), event.getType()); if (event.getType() == SessionPayloadEvent.TYPE) { + payloadEvents++; SessionPayloadEvent expectedPayloadEvent = (SessionPayloadEvent) expectedEvent; SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event; - if(((SessionPayloadEvent) event).getProtocolId() == HTTP) { + int protocolId = ((SessionPayloadEvent) event).getProtocolId(); + assertEquals(expectedProtocolId, protocolId); + + if (((SessionPayloadEvent) event).getProtocolId() == HTTP) { assertMessages(expectedPayloadEvent.getRequest(), payloadEvent.getRequest()); assertMessagesContent(expectedPayloadEvent.getRequest(), payloadEvent.getRequest()); assertMessages(expectedPayloadEvent.getResponse(), payloadEvent.getResponse()); @@ -79,6 +85,8 @@ } } } + + assertTrue("No session payload events.", payloadEvents > 0); } public static void assertNoError(Errors errors) {
--- a/stress-tester/src/test/java/com/passus/st/utils/EventUtils.java Fri Apr 10 10:22:01 2020 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,109 +0,0 @@ -package com.passus.st.utils; - -import com.passus.commons.utils.ArrayUtils; -import com.passus.commons.utils.ResourceUtils; -import com.passus.net.PortRangeSet; -import com.passus.net.dns.session.DnsUdpSessionAnalyzer; -import com.passus.net.http.session.HttpSessionAnalyzer; -import com.passus.net.netflow.NetflowUdpSessionAnalyzer; -import com.passus.net.session.SessionAnalyzer; -import com.passus.st.Protocols; -import com.passus.st.client.ArrayListEventHandler; -import com.passus.st.client.Event; -import com.passus.st.source.PcapSessionEventSource; - -import java.io.File; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * @author Mirosław Hawrot - */ -public class EventUtils { - - private EventUtils() { - } - - public static List<Event> readEvents(String pcapRelativeFile) { - return readEvents(pcapRelativeFile, null); - } - - public static List<Event> readEvents(String pcapRelativeFile, Map<String, Object> props) { - try { - boolean allowPartialSession = false; - Set<Integer> protocols = ArrayUtils.asSet(Protocols.HTTP); - PortRangeSet portsRange = null; - if (props != null) { - allowPartialSession = (boolean) props.getOrDefault("allowPartialSession", false); - - Object ports = props.getOrDefault("ports", null); - if (ports instanceof PortRangeSet) { - portsRange = (PortRangeSet) ports; - } else if (ports instanceof Integer) { - portsRange = new PortRangeSet(); - portsRange.add((Integer) ports); - } - - protocols = (Set<Integer>) props.getOrDefault("protocols", ArrayUtils.asSet(Protocols.HTTP)); - } - - File pcapFile = ResourceUtils.getFile(pcapRelativeFile); - PcapSessionEventSource src = new PcapSessionEventSource(); - src.setAllowPartialSession(allowPartialSession); - src.setPcapFile(pcapFile.getAbsolutePath()); - - SessionAnalyzer analyzer; - if (protocols.contains(Protocols.HTTP)) { - analyzer = new HttpSessionAnalyzer(); - if (portsRange != null) { - analyzer.getPortsRange().addAll(portsRange); - } - src.addAnalyzer(analyzer); - } - - if (protocols.contains(Protocols.DNS)) { - analyzer = new DnsUdpSessionAnalyzer(); - if (portsRange != null) { - analyzer.getPortsRange().addAll(portsRange); - } - src.addAnalyzer(analyzer); - } - - if (protocols.contains(Protocols.NETFLOW)) { - analyzer = new NetflowUdpSessionAnalyzer(); - if (portsRange != null) { - analyzer.getPortsRange().addAll(portsRange); - } - src.addAnalyzer(analyzer); - } - - src.setLoops(1); - - ArrayListEventHandler handler = new ArrayListEventHandler(); - src.setHandler(handler); - - src.start(); - waitForSource(src); - src.stop(); - - return handler.getEvents(); - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - } - - public static void waitForSource(PcapSessionEventSource src) { - try { - Thread.sleep(200); - } catch (Exception e) { - } - - while (src.isWorking()) { - try { - Thread.sleep(100); - } catch (Exception e) { - } - } - } -}