Mercurial > stress-tester
changeset 1038:9dff4810b5b8
NcEventSource - DNS and Netflow support
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/Protocols.java Thu Apr 09 13:49:19 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/Protocols.java Fri Apr 10 10:22:01 2020 +0200 @@ -1,5 +1,9 @@ package com.passus.st; +import com.passus.commons.utils.ArrayUtils; + +import java.util.Set; + public class Protocols { public static final int UNKNOWN = 0; @@ -12,6 +16,10 @@ private Protocols() { } + public static Set<Integer> getAllProtocols() { + return ArrayUtils.asSet(HTTP, DNS, NETFLOW, PGSQL, MYSQL); + } + public static String protocolToString(int protocolId) { switch (protocolId) { case HTTP:
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDnsPayloadReader.java Fri Apr 10 10:22:01 2020 +0200 @@ -0,0 +1,37 @@ +package com.passus.st.reader.nc; + +import com.passus.data.ByteBuff; +import com.passus.data.DataDecoder; +import com.passus.net.dns.Dns; +import com.passus.net.dns.DnsDecoder; +import com.passus.st.source.NcPayloadReader; + +import java.io.IOException; + +public class NcDnsPayloadReader extends NcPayloadReader<Dns, Dns> { + + private final DnsDecoder decoder = new DnsDecoder(); + + private Dns decode(ByteBuff payload) throws IOException { + try { + int res = decoder.decode(payload); + payload.skipBytes(res); + if (decoder.state() == DataDecoder.STATE_FINISHED) { + return decoder.getResult(); + } + throw new IOException("Invalid Dns data."); + } finally { + decoder.clear(); + } + } + + @Override + protected Dns decodeRequest(ByteBuff payload) throws IOException { + return decode(payload); + } + + @Override + protected Dns decodeResponse(ByteBuff payload) throws IOException { + return decode(payload); + } +}
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpPayloadReader.java Thu Apr 09 13:49:19 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpPayloadReader.java Fri Apr 10 10:22:01 2020 +0200 @@ -3,20 +3,16 @@ import com.passus.commons.Assert; import com.passus.data.*; import com.passus.net.http.*; -import com.passus.st.client.SessionPayloadEvent; -import com.passus.st.client.http.HttpReqResp; import com.passus.st.source.NcPayloadReader; import java.io.IOException; -import static com.passus.st.reader.nc.NcDataUtils.*; +import static com.passus.st.reader.nc.NcDataUtils.CUSTOM_HEADER_CODE; /** * @author Mirosław Hawrot */ -public class NcHttpPayloadReader implements NcPayloadReader { - - private final NcDataHelper ncDataHelper = NcDataHelper.getInstance(); +public class NcHttpPayloadReader extends NcPayloadReader<HttpRequest, HttpResponse> { private ByteBuffAllocator allocator = new DefaultByteBuffAllocator(); @@ -29,20 +25,6 @@ this.allocator = allocator; } - private void skipRequest(ByteBuff buffer) throws IOException { - ncDataHelper.skipByteStringNullTerminated(buffer); - ncDataHelper.skipByteStringNullTerminated(buffer); - buffer.read(); - skipHeaders(buffer); - } - - private void skipResponse(ByteBuff buffer) throws IOException { - buffer.skipBytes(2); - ncDataHelper.skipByteStringNullTerminated(buffer); - buffer.read(); - skipHeaders(buffer); - } - public int decodeVersion(ByteBuff buffer) throws IOException { byte b = buffer.read(); if (b == NcDataUtils.VERSION_1_0) { @@ -93,7 +75,7 @@ } } - public HttpHeaders decodeHeaders(ByteBuff buffer) throws IOException { + private HttpHeaders decodeHeaders(ByteBuff buffer) throws IOException { long headerSize = ncDataHelper.readLongVLC(buffer); HttpHeaders headers; if (HttpHeadersDecoder.isDefaultEnableCachedHeaders()) { @@ -145,7 +127,7 @@ } } - public HttpRequest decodeRequest(ByteBuff buffer) throws IOException { + private HttpRequest decodeRequestHeaders(ByteBuff buffer) throws IOException { HttpMethod method = decodeMethod(buffer); ByteString uri = ncDataHelper.readByteStringNullTerminated(buffer); HttpRequest req = new HttpRequest(uri, method); @@ -154,13 +136,13 @@ return req; } - public HttpRequest decodeFullRequest(ByteBuff buffer) throws IOException { - HttpRequest req = decodeRequest(buffer); + protected HttpRequest decodeRequest(ByteBuff buffer) throws IOException { + HttpRequest req = decodeRequestHeaders(buffer); req.setContent(decodeContent(buffer)); return req; } - public HttpResponse decodeResponse(ByteBuff buffer) throws IOException { + private HttpResponse decodeResponseHeaders(ByteBuff buffer) throws IOException { int statusCode = ncDataHelper.readInt2(buffer); ByteString reasonPhrase = ncDataHelper.readByteStringNullTerminated(buffer); HttpStatus status = new HttpStatus(statusCode, reasonPhrase); @@ -170,56 +152,10 @@ return resp; } - public HttpResponse decodeFullResponse(ByteBuff buffer) throws IOException { - HttpResponse resp = decodeResponse(buffer); + protected HttpResponse decodeResponse(ByteBuff buffer) throws IOException { + HttpResponse resp = decodeResponseHeaders(buffer); resp.setContent(decodeContent(buffer)); return resp; } - public HttpMessage decodeMessage(ByteBuff buffer) throws IOException { - byte flags = buffer.read(); - - HttpMessage msg; - if ((flags & FLAG_REQUEST) != 0) { - msg = decodeRequest(buffer); - } else { - msg = decodeResponse(buffer); - } - - return msg; - } - - public HttpReqResp decodeMessages(ByteBuff buffer) throws IOException { - return decodeMessages(buffer, false, false); - } - - public HttpReqResp decodeMessages(ByteBuff buffer, boolean skipRequest, boolean skipResponse) throws IOException { - byte flags = buffer.read(); - HttpRequest req = null; - if ((flags & FLAG_REQUEST) != 0) { - if (skipRequest) { - skipRequest(buffer); - } else { - req = decodeFullRequest(buffer); - } - } - - HttpResponse resp = null; - if ((flags & FLAG_RESPONSE) != 0) { - if (skipResponse) { - skipResponse(buffer); - } else { - resp = decodeFullResponse(buffer); - } - } - - buffer.release(); - return new HttpReqResp(req, resp); - } - - @Override - public SessionPayloadEvent read(NcDataBlockReader reader) throws IOException { - return null; - } - }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcNetflowPayloadReader.java Fri Apr 10 10:22:01 2020 +0200 @@ -0,0 +1,57 @@ +package com.passus.st.reader.nc; + +import com.passus.data.ByteBuff; +import com.passus.data.DataUtils; +import com.passus.net.dns.DnsDecoder; +import com.passus.net.netflow.Netflow; +import com.passus.net.netflow.Netflow5Decoder; +import com.passus.net.netflow.Netflow9Decoder; +import com.passus.net.netflow.NetflowUtils; +import com.passus.st.source.NcPayloadReader; + +import java.io.IOException; + +public class NcNetflowPayloadReader extends NcPayloadReader<Netflow, Netflow> { + + private final Netflow5Decoder decoder5 = new Netflow5Decoder(); + + private final Netflow9Decoder decoder9 = new Netflow9Decoder(); + + @Override + protected Netflow decodeRequest(ByteBuff payload) throws IOException { + int version = DataUtils.getInt2(payload); + payload.skipBytes(2); + if (version == NetflowUtils.VERSION_5) { + try { + int res = decoder5.decode0(payload.buffer(), payload.startIndex(), payload.readableBytes()); + payload.skipBytes(res); + if (decoder5.state() == DnsDecoder.STATE_FINISHED) { + return decoder5.getResult(); + } else { + throw new IOException("Invalid Netflow v5 data."); + } + } finally { + decoder5.clear(); + } + } else if (version == NetflowUtils.VERSION_9) { + try { + int res = decoder9.decode0(payload.buffer(), payload.startIndex(), payload.readableBytes()); + payload.skipBytes(res); + if (decoder9.state() == DnsDecoder.STATE_FINISHED) { + return decoder9.getResult(); + } else { + throw new IOException("Invalid Netflow v9 data."); + } + } finally { + decoder9.clear(); + } + } + + return null; + } + + @Override + protected Netflow decodeResponse(ByteBuff payload) throws IOException { + return null; + } +}
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java Thu Apr 09 13:49:19 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java Fri Apr 10 10:22:01 2020 +0200 @@ -23,10 +23,7 @@ import com.passus.st.client.SessionStatusEvent; import com.passus.st.emitter.SessionInfo; import com.passus.st.plugin.PluginConstants; -import com.passus.st.reader.nc.NcDataBlockReader; -import com.passus.st.reader.nc.NcHttpPayloadReader; -import com.passus.st.reader.nc.NcSessionPayloadBlock; -import com.passus.st.reader.nc.NcSessionStatusBlock; +import com.passus.st.reader.nc.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,7 +31,7 @@ import java.io.IOException; import static com.passus.config.schema.ConfigurationSchemaBuilder.*; -import static com.passus.st.Protocols.HTTP; +import static com.passus.st.Protocols.*; /** * @author Mirosław Hawrot @@ -57,6 +54,10 @@ private NcHttpPayloadReader httpReader = new NcHttpPayloadReader(); + private NcNetflowPayloadReader netflowReader = new NcNetflowPayloadReader(); + + private NcDnsPayloadReader dnsReader = new NcDnsPayloadReader(); + private String name = UniqueIdGenerator.generate(); private ReaderThread readerThread; @@ -241,13 +242,26 @@ sessionInfo.setSourceName(getName()); ByteBuff payload = readPayload(payloadBlock.data()); - ReqRespPair messages = null; - if (payloadBlock.proto() == HTTP) { - messages = httpReader.decodeMessages(payload); + ReqRespPair messages; + switch (payloadBlock.proto()) { + case HTTP: + messages = httpReader.read(payload); + break; + case DNS: + messages = dnsReader.read(payload); + break; + case NETFLOW: + messages = netflowReader.read(payload); + break; + default: + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Not supported protocol {}.", payloadBlock.proto()); + } + + return; } - - handler.handle(new SessionPayloadEvent(sessionInfo, messages.getRequest(), messages.getResponse(), HTTP, getName())); + handler.handle(new SessionPayloadEvent(sessionInfo, messages.getRequest(), messages.getResponse(), payloadBlock.proto(), getName())); break; default: reader.read();
--- a/stress-tester/src/main/java/com/passus/st/source/NcPayloadReader.java Thu Apr 09 13:49:19 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/NcPayloadReader.java Fri Apr 10 10:22:01 2020 +0200 @@ -1,12 +1,36 @@ package com.passus.st.source; -import com.passus.st.client.SessionPayloadEvent; -import com.passus.st.reader.nc.NcDataBlockReader; +import com.passus.data.ByteBuff; +import com.passus.st.ReqRespPair; +import com.passus.st.reader.nc.NcDataHelper; import java.io.IOException; -public interface NcPayloadReader { +import static com.passus.st.reader.nc.NcDataUtils.FLAG_REQUEST; +import static com.passus.st.reader.nc.NcDataUtils.FLAG_RESPONSE; - SessionPayloadEvent read(NcDataBlockReader reader) throws IOException; - +public abstract class NcPayloadReader<R, S> { + + protected final NcDataHelper ncDataHelper = NcDataHelper.getInstance(); + + protected abstract R decodeRequest(ByteBuff payload) throws IOException; + + protected abstract S decodeResponse(ByteBuff payload) throws IOException; + + public ReqRespPair<R, S> read(ByteBuff payload) throws IOException { + byte flags = payload.read(); + R req = null; + S resp = null; + if ((flags & FLAG_REQUEST) != 0) { + req = decodeRequest(payload); + } + + if ((flags & FLAG_RESPONSE) != 0) { + resp = decodeResponse(payload); + } + + payload.release(); + return new ReqRespPair(req, resp); + } + }
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcHttpPayloadReaderTest.java Thu Apr 09 13:49:19 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcHttpPayloadReaderTest.java Fri Apr 10 10:22:01 2020 +0200 @@ -2,18 +2,18 @@ import com.passus.data.ByteBuff; import com.passus.data.HeapByteBuff; -import com.passus.net.http.HttpMessage; import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpRequestBuilder; import com.passus.net.http.HttpResponse; import com.passus.net.http.HttpResponseBuilder; -import org.testng.annotations.Test; -import static com.passus.st.utils.HttpMessageAssert.*; +import com.passus.st.ReqRespPair; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; +import org.testng.annotations.Test; + +import static com.passus.st.utils.HttpMessageAssert.assertMessages; /** - * * @author Mirosław Hawrot */ public class NcHttpPayloadReaderTest { @@ -51,8 +51,8 @@ writer.encodeMessage(req, buffer); NcHttpPayloadReader reader = new NcHttpPayloadReader(); - HttpMessage msgDecoded = reader.decodeMessage(buffer); - assertMessages(req, msgDecoded); + ReqRespPair<HttpRequest, HttpResponse> msgDecoded = reader.read(buffer); + assertMessages(req, msgDecoded.getRequest()); } @Test @@ -60,8 +60,8 @@ writer.encodeMessage(resp, buffer); NcHttpPayloadReader reader = new NcHttpPayloadReader(); - HttpMessage msgDecoded = reader.decodeMessage(buffer); - assertMessages(resp, msgDecoded); + ReqRespPair<HttpRequest, HttpResponse> msgDecoded = reader.read(buffer); + assertMessages(resp, msgDecoded.getResponse()); } @Test
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcHttpPayloadWriterTest.java Thu Apr 09 13:49:19 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcHttpPayloadWriterTest.java Fri Apr 10 10:22:01 2020 +0200 @@ -2,6 +2,7 @@ import com.passus.data.HeapByteBuff; import com.passus.net.http.*; +import com.passus.st.ReqRespPair; import com.passus.st.client.SessionPayloadEvent; import com.passus.st.client.http.HttpReqResp; import com.passus.st.emitter.SessionInfo; @@ -85,7 +86,7 @@ NcSessionPayloadBlock payload = readPayloadBlock(file); NcHttpPayloadReader payloadReader = new NcHttpPayloadReader(); - HttpReqResp reqResp = payloadReader.decodeMessages(toByteBuff(payload.data())); + ReqRespPair<HttpRequest, HttpResponse> reqResp = payloadReader.read(toByteBuff(payload.data())); assertFalse(reqResp == null); assertTrue(reqResp.getResponse() == null); assertMessages(req, reqResp.getRequest()); @@ -107,7 +108,7 @@ NcSessionPayloadBlock payload = readPayloadBlock(file); NcHttpPayloadReader payloadReader = new NcHttpPayloadReader(); - HttpReqResp reqResp = payloadReader.decodeMessages(toByteBuff(payload.data())); + ReqRespPair<HttpRequest, HttpResponse> reqResp = payloadReader.read(toByteBuff(payload.data())); assertFalse(reqResp == null); assertTrue(reqResp.getResponse() == null); assertMessages(req, reqResp.getRequest()); @@ -129,7 +130,7 @@ NcSessionPayloadBlock payload = readPayloadBlock(file); NcHttpPayloadReader payloadReader = new NcHttpPayloadReader(); - HttpReqResp reqResp = payloadReader.decodeMessages(toByteBuff(payload.data())); + ReqRespPair<HttpRequest, HttpResponse> reqResp = payloadReader.read(toByteBuff(payload.data())); assertFalse(reqResp == null); assertTrue(reqResp.getResponse() == null); assertMessages(req, reqResp.getRequest()); @@ -151,7 +152,7 @@ NcSessionPayloadBlock payload = readPayloadBlock(file); NcHttpPayloadReader payloadReader = new NcHttpPayloadReader(); - HttpReqResp reqResp = payloadReader.decodeMessages(toByteBuff(payload.data())); + ReqRespPair<HttpRequest, HttpResponse> reqResp = payloadReader.read(toByteBuff(payload.data())); assertFalse(reqResp == null); assertTrue(reqResp.getRequest() == null); assertMessages(resp, reqResp.getResponse()); @@ -169,7 +170,7 @@ NcSessionPayloadBlock payload = readPayloadBlock(file); NcHttpPayloadReader payloadReader = new NcHttpPayloadReader(); - HttpReqResp reqResp = payloadReader.decodeMessages(toByteBuff(payload.data())); + ReqRespPair<HttpRequest, HttpResponse> reqResp = payloadReader.read(toByteBuff(payload.data())); assertFalse(reqResp == null); assertTrue(reqResp.getRequest() == null); assertTrue(reqResp.getResponse() == null);
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java Thu Apr 09 13:49:19 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java Fri Apr 10 10:22:01 2020 +0200 @@ -1,30 +1,36 @@ package com.passus.st.source; -import static com.passus.commons.utils.ResourceUtils.createTmpFile; import com.passus.data.PooledByteBuffAllocator; +import com.passus.net.PortRangeSet; +import com.passus.st.Protocols; import com.passus.st.client.ArrayListEventHandler; import com.passus.st.client.Event; import com.passus.st.utils.EventUtils; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; -import static com.passus.st.utils.Assert.*; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; + +import static com.passus.commons.utils.ResourceUtils.createTmpFile; +import static com.passus.st.utils.Assert.assertEquals; +import static com.passus.st.utils.Assert.assertEvents; /** - * * @author Mirosław Hawrot */ public class NcEventSourceTest { private FileEvents writeEvents(String pcapFile) throws IOException { + PortRangeSet portRanges = new PortRangeSet(); + portRanges.add(4214).add(53).add(2055); Map<String, Object> props = new HashMap<>(); props.put("allowPartialSession", true); - props.put("ports", 4214); + props.put("ports", portRanges); + props.put("protocols", Protocols.getAllProtocols()); List<Event> events = EventUtils.readEvents(pcapFile, props); File tmpFile = createTmpFile(false); @@ -39,16 +45,18 @@ @DataProvider(name = "pcapFiles") public Object[][] pcapFiles() { return new Object[][]{ - {"pcap/http/http_req_resp.pcap"}, - {"pcap/http/http_ndiag_tcp_conn.pcap"}, - {"pcap/http/basic_digest.pcap"}, - {"pcap/http/http_get_png.pcap"}, - {"pcap/http/http_1.pcap"} + {"pcap/http/http_req_resp.pcap"}, + {"pcap/http/http_ndiag_tcp_conn.pcap"}, + {"pcap/http/basic_digest.pcap"}, + {"pcap/http/http_get_png.pcap"}, + {"pcap/http/http_1.pcap"}, + {"pcap/dns/dns_A_req_resp.pcap"}, + {"pcap/netflow/netflow_v5.pcap"} }; } @Test(dataProvider = "pcapFiles") - public void testRead(String pcapFile) throws Exception { + public void testWriteAndRead(String pcapFile) throws Exception { FileEvents fileEvents = writeEvents(pcapFile); try { ArrayListEventHandler handler = new ArrayListEventHandler();
--- a/stress-tester/src/test/java/com/passus/st/utils/Assert.java Thu Apr 09 13:49:19 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/utils/Assert.java Fri Apr 10 10:22:01 2020 +0200 @@ -64,14 +64,19 @@ Event event = events.get(i); assertEquals(expectedEvent.getType(), event.getType()); - if (event.getType() == SessionPayloadEvent.TYPE && ((SessionPayloadEvent) event).getProtocolId() == HTTP) { + if (event.getType() == SessionPayloadEvent.TYPE) { SessionPayloadEvent expectedPayloadEvent = (SessionPayloadEvent) expectedEvent; SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event; - assertMessages(expectedPayloadEvent.getRequest(), payloadEvent.getRequest()); - assertMessagesContent(expectedPayloadEvent.getRequest(), payloadEvent.getRequest()); - assertMessages(expectedPayloadEvent.getResponse(), payloadEvent.getResponse()); - assertMessagesContent(expectedPayloadEvent.getResponse(), payloadEvent.getResponse()); + if(((SessionPayloadEvent) event).getProtocolId() == HTTP) { + assertMessages(expectedPayloadEvent.getRequest(), payloadEvent.getRequest()); + assertMessagesContent(expectedPayloadEvent.getRequest(), payloadEvent.getRequest()); + assertMessages(expectedPayloadEvent.getResponse(), payloadEvent.getResponse()); + assertMessagesContent(expectedPayloadEvent.getResponse(), payloadEvent.getResponse()); + } else { + assertEquals(expectedPayloadEvent.getRequest(), payloadEvent.getRequest()); + assertEquals(expectedPayloadEvent.getResponse(), payloadEvent.getResponse()); + } } } }