Mercurial > stress-tester
changeset 704:b5062e521212
NC file in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java Tue Nov 28 14:14:01 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java Tue Nov 28 14:44:14 2017 +0100 @@ -1,8 +1,11 @@ package com.passus.st.reader.nc; import com.passus.data.ByteBuff; +import com.passus.data.ByteBuffDataSource; import com.passus.data.ByteString; import com.passus.data.DataHelper; +import com.passus.data.DataSource; +import com.passus.data.HeapByteBuff; import com.passus.net.http.HttpConsts; import com.passus.net.http.HttpHeaders; import com.passus.net.http.HttpMessage; @@ -26,6 +29,20 @@ private final DataHelper dataHelper = DataHelper.BIG_ENDIAN; + private void skipRequest(ByteBuff buffer) throws IOException { + ncDataHelper.skipByteStringNullTerminated(buffer); + ncDataHelper.skipByteStringNullTerminated(buffer); + buffer.read(); + skipHeaders(buffer); + } + + private void skipResponse(ByteBuff buffer) throws IOException { + buffer.skipBytes(2); + ncDataHelper.skipByteStringNullTerminated(buffer); + buffer.read(); + skipHeaders(buffer); + } + public ByteString decodeVersion(ByteBuff buffer) throws IOException { byte b = buffer.read(); if (b == NcHttpDataUtils.VERSION_1_0) { @@ -37,6 +54,15 @@ throw new IOException("Not supported HTTP version '" + b + "'."); } + private void skipHeaders(ByteBuff buffer) throws IOException { + long headerSize = ncDataHelper.readLongVLC(buffer); + if (headerSize <= Integer.MAX_VALUE) { + buffer.skipBytes((int) headerSize); + } else { + throw new IOException("Too big header size > Integer.MAX_VALUE"); + } + } + public HttpHeaders decodeHeaders(ByteBuff buffer) throws IOException { long headerSize = ncDataHelper.readLongVLC(buffer); int startIndex = buffer.startIndex(); @@ -65,17 +91,49 @@ return headers; } + private DataSource decodeContent(ByteBuff buffer) throws IOException { + long contentSize = ncDataHelper.readLongVLC(buffer); + if (contentSize == 0) { + return null; + } else if (contentSize <= Integer.MAX_VALUE) { + int contentSizeInt = (int) contentSize; + ByteBuff data = new HeapByteBuff(contentSizeInt); + buffer.read(data, contentSizeInt); + return new ByteBuffDataSource(data); + } else { + throw new IOException("Too big content size > Integer.MAX_VALUE"); + } + } + public HttpRequest decodeRequest(ByteBuff buffer) throws IOException { ByteString method = ncDataHelper.readByteStringNullTerminated(buffer); ByteString uri = ncDataHelper.readByteStringNullTerminated(buffer); - return new HttpRequest(uri, method); + HttpRequest req = new HttpRequest(uri, method); + req.setVersion(decodeVersion(buffer)); + req.setHeaders(decodeHeaders(buffer)); + return req; + } + + public HttpRequest decodeFullRequest(ByteBuff buffer) throws IOException { + HttpRequest req = decodeRequest(buffer); + req.setContent(decodeContent(buffer)); + return req; } public HttpResponse decodeResponse(ByteBuff buffer) throws IOException { int statusCode = dataHelper.readInt2(buffer); ByteString reasonPhrase = ncDataHelper.readByteStringNullTerminated(buffer); HttpStatus status = new HttpStatus(statusCode, reasonPhrase); - return new HttpResponse(status); + HttpResponse resp = new HttpResponse(status); + resp.setVersion(decodeVersion(buffer)); + resp.setHeaders(decodeHeaders(buffer)); + return resp; + } + + public HttpResponse decodeFullResponse(ByteBuff buffer) throws IOException { + HttpResponse resp = decodeResponse(buffer); + resp.setContent(decodeContent(buffer)); + return resp; } public HttpMessage decodeMessage(ByteBuff buffer) throws IOException { @@ -83,40 +141,42 @@ HttpMessage msg; if ((flags & FLAG_REQUEST) != 0) { - ByteString method = ncDataHelper.readByteStringNullTerminated(buffer); - ByteString uri = ncDataHelper.readByteStringNullTerminated(buffer); - - msg = new HttpRequest(uri, method); + msg = decodeRequest(buffer); } else { - int statusCode = dataHelper.readInt2(buffer); - ByteString reasonPhrase = ncDataHelper.readByteStringNullTerminated(buffer); - - HttpStatus status = new HttpStatus(statusCode, reasonPhrase); - msg = new HttpResponse(status); + msg = decodeResponse(buffer); } - msg.setVersion(decodeVersion(buffer)); - msg.setHeaders(decodeHeaders(buffer)); return msg; } public HttpReqResp decodeMessages(ByteBuff buffer) throws IOException { + return decodeMessages(buffer, false, false); + } + + public HttpReqResp decodeMessages(ByteBuff buffer, boolean skipRequest, boolean skipResponse) throws IOException { byte flags = buffer.read(); HttpRequest req = null; if ((flags & FLAG_REQUEST) != 0) { - + if (skipRequest) { + skipRequest(buffer); + } else { + req = decodeFullRequest(buffer); + } } HttpResponse resp = null; if ((flags & FLAG_RESPONSE) != 0) { - + if (skipResponse) { + skipResponse(buffer); + } else { + resp = decodeFullResponse(buffer); + } } return new HttpReqResp(req, resp); } public HttpSessionPayloadEvent read(NcDataBlockReader reader) throws IOException { - return null; }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java Tue Nov 28 14:14:01 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java Tue Nov 28 14:44:14 2017 +0100 @@ -166,21 +166,15 @@ if (respContent != null) { writer.writeSessionPayloadData(respContent); } - + writer.closeSessionPayloadBlock(); + size++; } public void write(HttpSessionPayloadEvent event, NcDataBlockWriter writer) throws IOException { long time = event.getTimestamp(); SessionInfo session = event.getSessionInfo(); - - if (event.getRequest() != null) { - encodeFullMessage(time, session, event.getRequest(), writer); - } - - if (event.getResponse() != null) { - encodeFullMessage(time, session, event.getResponse(), writer); - } + encodeFullMessages(time, session, event.getPayload(), writer); } }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java Tue Nov 28 14:14:01 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java Tue Nov 28 14:44:14 2017 +0100 @@ -52,6 +52,17 @@ return buffer.toByteString(startIndex, startIndex + len); } + public void skipByteStringNullTerminated(ByteBuff buffer) { + skipByteStringTerminated(buffer, AsciiUtils.NUL); + } + + public void skipByteStringTerminated(ByteBuff buffer, byte delim) { + int endIndex = buffer.indexOf(delim); + if (endIndex != -1) { + buffer.skipBytes(endIndex + 1); + } + } + public ByteString readByteStringNullTerminated(ByteBuff buffer) { return readByteStringTerminated(buffer, AsciiUtils.NUL); }
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java Tue Nov 28 14:14:01 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java Tue Nov 28 14:44:14 2017 +0100 @@ -7,12 +7,17 @@ import com.passus.data.ByteBuffDataSource; import com.passus.st.client.EventHandler; import com.passus.st.client.SessionStatusEvent; +import com.passus.st.client.http.HttpReqResp; +import com.passus.st.client.http.HttpSessionPayloadEvent; +import com.passus.st.emitter.SessionInfo; import com.passus.st.plugin.PluginConstants; import com.passus.st.reader.nc.HttpSessionPayloadEventDataReader; import com.passus.st.reader.nc.NcDataBlockReader; import com.passus.st.reader.nc.NcSessionPayloadBlock; import com.passus.st.reader.nc.NcSessionStatusBlock; +import java.io.File; import java.io.IOException; +import java.util.Objects; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -35,12 +40,23 @@ private EventHandler handler; - private HttpSessionPayloadEventDataReader httpReader; + private HttpSessionPayloadEventDataReader httpReader = new HttpSessionPayloadEventDataReader(); private String name; private ReaderThread readerThread; + public NcEventSource() { + } + + public NcEventSource(String ncFile) { + this.ncFile = ncFile; + } + + public NcEventSource(File file) { + this.ncFile = file.getAbsolutePath(); + } + @Override public String getType() { return TYPE; @@ -81,6 +97,10 @@ try { reader = new NcDataBlockReader(ncFile); reader.open(); + + readerThread = new ReaderThread(); + readerThread.start(); + started = true; } catch (Exception e) { stop0(); @@ -111,17 +131,29 @@ readerThread.join(5_000); } catch (InterruptedException ignore) { } + + readerThread = null; } started = false; } + public void join() throws InterruptedException { + if (readerThread != null) { + readerThread.join(); + } + } + private ByteBuff readPayload(Object data) { - if (data instanceof ByteBuffDataSource) { + if (data == null) { + return null; + } else if (data instanceof ByteBuff) { + return (ByteBuff) data; + } else if (data instanceof ByteBuffDataSource) { ByteBuffDataSource ds = (ByteBuffDataSource) data; return ds.getByteBuffer(); } else { - throw new IllegalArgumentException("Not supported data object."); + throw new IllegalArgumentException("Not supported data object '" + data.getClass() + "'."); } } @@ -143,10 +175,10 @@ break; case NcSessionPayloadBlock.TYPE: NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) reader.read(); + SessionInfo sessionInfo = payloadBlock.sessionInfo(); ByteBuff payload = readPayload(payloadBlock.data()); - - /*HttpMessage msg = httpReader.readMessage(payload); - handler.handle(event);*/ + HttpReqResp messages = httpReader.decodeMessages(payload); + handler.handle(new HttpSessionPayloadEvent(sessionInfo, messages, sessionInfo.getSourceName())); break; default: reader.read();
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReaderTest.java Tue Nov 28 14:14:01 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReaderTest.java Tue Nov 28 14:44:14 2017 +0100 @@ -7,9 +7,10 @@ import com.passus.net.http.HttpRequestBuilder; import com.passus.net.http.HttpResponse; import com.passus.net.http.HttpResponseBuilder; -import java.io.IOException; import org.testng.annotations.Test; import static com.passus.st.utils.HttpMessageAssert.*; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; /** * @@ -17,18 +18,37 @@ */ public class HttpSessionPayloadEventDataReaderTest { - private void encodeMessage(HttpMessage msg, ByteBuff buffer) throws IOException { - HttpSessionPayloadEventDataWriter writer = new HttpSessionPayloadEventDataWriter(); - writer.encodeMessage(msg, buffer); + private final HttpSessionPayloadEventDataWriter writer = new HttpSessionPayloadEventDataWriter(); + + private ByteBuff buffer = new HeapByteBuff(); + + private final HttpRequest req; + + private final HttpResponse resp; + + public HttpSessionPayloadEventDataReaderTest() { + req = HttpRequestBuilder.get("http://test.com/test") + .header("X-Header", "X-Header-Value") + .build(); + + resp = HttpResponseBuilder.ok() + .header("X-Header", "X-Header-Value") + .build(); + } + + @AfterMethod + public void afterMethod() { + buffer.clear(); + } + + @AfterClass + public void afterClass() { + buffer = null; } @Test - public void testDecodeMessage_Request() throws Exception { - ByteBuff buffer = new HeapByteBuff(); - HttpRequest req = HttpRequestBuilder.get("http://test.com/test") - .header("X-Header", "X-Header-Value") - .build(); - encodeMessage(req, buffer); + public void testDecode_Request() throws Exception { + writer.encodeMessage(req, buffer); HttpSessionPayloadEventDataReader reader = new HttpSessionPayloadEventDataReader(); HttpMessage msgDecoded = reader.decodeMessage(buffer); @@ -36,15 +56,16 @@ } @Test - public void testDecodeMessage_Response() throws Exception { - ByteBuff buffer = new HeapByteBuff(); - HttpResponse resp = HttpResponseBuilder.ok() - .header("X-Header", "X-Header-Value") - .build(); - encodeMessage(resp, buffer); + public void testDecode_Response() throws Exception { + writer.encodeMessage(resp, buffer); HttpSessionPayloadEventDataReader reader = new HttpSessionPayloadEventDataReader(); HttpMessage msgDecoded = reader.decodeMessage(buffer); assertMessages(resp, msgDecoded); } + + @Test + public void testDecode_Messages() throws Exception { + //writer.encodeFullMessages(req, resp, buffer); + } }
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java Tue Nov 28 14:14:01 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java Tue Nov 28 14:44:14 2017 +0100 @@ -47,20 +47,19 @@ eventDst.stop(); List<NcDataBlock> blocks = NcDataBlockReaderUtils.readAll(tmpFile); - assertEquals(6, blocks.size()); + assertEquals(5, blocks.size()); assertEquals(NcSegmentBlock.TYPE, blocks.get(0).type()); assertEquals(NcSessionInfoBlock.TYPE, blocks.get(1).type()); NcSessionInfoBlock infoBlock = (NcSessionInfoBlock) blocks.get(1); assertEquals(sessionInfo, infoBlock.sessionInfo()); assertEquals(NcSessionPayloadBlock.TYPE, blocks.get(2).type()); - assertEquals(NcSessionPayloadBlock.TYPE, blocks.get(3).type()); - assertEquals(NcSessionStatusBlock.TYPE, blocks.get(4).type()); - NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) blocks.get(4); + assertEquals(NcSessionStatusBlock.TYPE, blocks.get(3).type()); + NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) blocks.get(3); assertEquals(sessionInfo, statusBlock.sessionInfo()); assertEquals(statusEvent.getStatus(), statusBlock.status()); - assertEquals(NcSegmentBlock.TYPE, blocks.get(5).type()); + assertEquals(NcSegmentBlock.TYPE, blocks.get(4).type()); } finally { tmpFile.delete(); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java Tue Nov 28 14:44:14 2017 +0100 @@ -0,0 +1,90 @@ +package com.passus.st.source; + +import static com.passus.commons.utils.ResourceUtils.createTmpFile; +import com.passus.st.client.ArrayListEventHandler; +import com.passus.st.client.DataEvents; +import com.passus.st.client.Event; +import com.passus.st.client.http.HttpSessionPayloadEvent; +import com.passus.st.utils.EventUtils; +import static com.passus.st.utils.HttpMessageAssert.assertMessages; +import static com.passus.st.utils.HttpMessageAssert.assertMessagesContent; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; +import static org.testng.AssertJUnit.*; +import org.testng.annotations.Test; + +/** + * + * @author Mirosław Hawrot + */ +public class NcEventSourceTest { + + private FileEvents writeEvents(String pcapFile) throws IOException { + Properties props = new Properties(); + props.put("allowPartialSession", "true"); + props.put("ports", "4214"); + List<Event> events = EventUtils.readEvents(pcapFile, props); + + File tmpFile = createTmpFile(false); + NcEventDestination eventDst = new NcEventDestination(tmpFile); + eventDst.start(); + events.forEach(eventDst::handle); + eventDst.stop(); + + events = events.stream().filter((event) -> { + return (event.getType() != DataEvents.DataEnd.TYPE + && event.getType() != DataEvents.DataLoopEnd.TYPE); + }).collect(Collectors.toList()); + + return new FileEvents(tmpFile, events); + } + + @Test + public void testRead() throws Exception { + FileEvents fileEvents = writeEvents("pcap/http/http_req_resp.pcap"); + try { + ArrayListEventHandler handler = new ArrayListEventHandler(); + NcEventSource eventSource = new NcEventSource(fileEvents.ncFile); + eventSource.setHandler(handler); + eventSource.start(); + eventSource.join(); + + List<Event> events = handler.getEvents(); + assertEquals(fileEvents.events.size(), events.size()); + for (int i = 0; i < events.size(); i++) { + Event expectedEvent = fileEvents.events.get(i); + Event event = events.get(i); + + assertEquals(expectedEvent.getType(), event.getType()); + if (event.getType() == HttpSessionPayloadEvent.TYPE) { + HttpSessionPayloadEvent expectedPayloadEvent = (HttpSessionPayloadEvent) expectedEvent; + HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event; + + assertMessages(expectedPayloadEvent.getRequest(), payloadEvent.getRequest()); + assertMessagesContent(expectedPayloadEvent.getRequest(), payloadEvent.getRequest()); + assertMessages(expectedPayloadEvent.getResponse(), payloadEvent.getResponse()); + assertMessagesContent(expectedPayloadEvent.getResponse(), payloadEvent.getResponse()); + } + } + + } finally { + fileEvents.ncFile.delete(); + } + } + + private class FileEvents { + + public final File ncFile; + + public final List<Event> events; + + public FileEvents(File ncFile, List<Event> events) { + this.ncFile = ncFile; + this.events = events; + } + + } +}
--- a/stress-tester/src/test/java/com/passus/st/utils/HttpMessageAssert.java Tue Nov 28 14:14:01 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/utils/HttpMessageAssert.java Tue Nov 28 14:44:14 2017 +0100 @@ -68,7 +68,7 @@ } assertEquals(expectedResp.getStatus().getCode(), resp.getStatus().getCode()); - assertEquals(expectedResp.getStatus().getReasonPhrase(), resp.getStatus().getReasonPhrase()); + assertEquals(expectedResp.getStatus().getReasonPhrase().toString(), resp.getStatus().getReasonPhrase().toString()); assertEquals(expectedResp.getVersion(), resp.getVersion()); assertHeaders(expectedResp.getHeaders(), resp.getHeaders()); }