changeset 704:b5062e521212

NC file in progress
author Devel 2
date Tue, 28 Nov 2017 14:44:14 +0100
parents 68d12f09c3a1
children 122560887a6a
files stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java stress-tester/src/main/java/com/passus/st/source/NcEventSource.java stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReaderTest.java stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java stress-tester/src/test/java/com/passus/st/utils/HttpMessageAssert.java
diffstat 8 files changed, 260 insertions(+), 53 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java	Tue Nov 28 14:14:01 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java	Tue Nov 28 14:44:14 2017 +0100
@@ -1,8 +1,11 @@
 package com.passus.st.reader.nc;
 
 import com.passus.data.ByteBuff;
+import com.passus.data.ByteBuffDataSource;
 import com.passus.data.ByteString;
 import com.passus.data.DataHelper;
+import com.passus.data.DataSource;
+import com.passus.data.HeapByteBuff;
 import com.passus.net.http.HttpConsts;
 import com.passus.net.http.HttpHeaders;
 import com.passus.net.http.HttpMessage;
@@ -26,6 +29,20 @@
 
     private final DataHelper dataHelper = DataHelper.BIG_ENDIAN;
 
+    private void skipRequest(ByteBuff buffer) throws IOException {
+        ncDataHelper.skipByteStringNullTerminated(buffer);
+        ncDataHelper.skipByteStringNullTerminated(buffer);
+        buffer.read();
+        skipHeaders(buffer);
+    }
+
+    private void skipResponse(ByteBuff buffer) throws IOException {
+        buffer.skipBytes(2);
+        ncDataHelper.skipByteStringNullTerminated(buffer);
+        buffer.read();
+        skipHeaders(buffer);
+    }
+
     public ByteString decodeVersion(ByteBuff buffer) throws IOException {
         byte b = buffer.read();
         if (b == NcHttpDataUtils.VERSION_1_0) {
@@ -37,6 +54,15 @@
         throw new IOException("Not supported HTTP version '" + b + "'.");
     }
 
+    private void skipHeaders(ByteBuff buffer) throws IOException {
+        long headerSize = ncDataHelper.readLongVLC(buffer);
+        if (headerSize <= Integer.MAX_VALUE) {
+            buffer.skipBytes((int) headerSize);
+        } else {
+            throw new IOException("Too big header size > Integer.MAX_VALUE");
+        }
+    }
+
     public HttpHeaders decodeHeaders(ByteBuff buffer) throws IOException {
         long headerSize = ncDataHelper.readLongVLC(buffer);
         int startIndex = buffer.startIndex();
@@ -65,17 +91,49 @@
         return headers;
     }
 
+    private DataSource decodeContent(ByteBuff buffer) throws IOException {
+        long contentSize = ncDataHelper.readLongVLC(buffer);
+        if (contentSize == 0) {
+            return null;
+        } else if (contentSize <= Integer.MAX_VALUE) {
+            int contentSizeInt = (int) contentSize;
+            ByteBuff data = new HeapByteBuff(contentSizeInt);
+            buffer.read(data, contentSizeInt);
+            return new ByteBuffDataSource(data);
+        } else {
+            throw new IOException("Too big content size > Integer.MAX_VALUE");
+        }
+    }
+
     public HttpRequest decodeRequest(ByteBuff buffer) throws IOException {
         ByteString method = ncDataHelper.readByteStringNullTerminated(buffer);
         ByteString uri = ncDataHelper.readByteStringNullTerminated(buffer);
-        return new HttpRequest(uri, method);
+        HttpRequest req = new HttpRequest(uri, method);
+        req.setVersion(decodeVersion(buffer));
+        req.setHeaders(decodeHeaders(buffer));
+        return req;
+    }
+
+    public HttpRequest decodeFullRequest(ByteBuff buffer) throws IOException {
+        HttpRequest req = decodeRequest(buffer);
+        req.setContent(decodeContent(buffer));
+        return req;
     }
 
     public HttpResponse decodeResponse(ByteBuff buffer) throws IOException {
         int statusCode = dataHelper.readInt2(buffer);
         ByteString reasonPhrase = ncDataHelper.readByteStringNullTerminated(buffer);
         HttpStatus status = new HttpStatus(statusCode, reasonPhrase);
-        return new HttpResponse(status);
+        HttpResponse resp = new HttpResponse(status);
+        resp.setVersion(decodeVersion(buffer));
+        resp.setHeaders(decodeHeaders(buffer));
+        return resp;
+    }
+
+    public HttpResponse decodeFullResponse(ByteBuff buffer) throws IOException {
+        HttpResponse resp = decodeResponse(buffer);
+        resp.setContent(decodeContent(buffer));
+        return resp;
     }
 
     public HttpMessage decodeMessage(ByteBuff buffer) throws IOException {
@@ -83,40 +141,42 @@
 
         HttpMessage msg;
         if ((flags & FLAG_REQUEST) != 0) {
-            ByteString method = ncDataHelper.readByteStringNullTerminated(buffer);
-            ByteString uri = ncDataHelper.readByteStringNullTerminated(buffer);
-
-            msg = new HttpRequest(uri, method);
+            msg = decodeRequest(buffer);
         } else {
-            int statusCode = dataHelper.readInt2(buffer);
-            ByteString reasonPhrase = ncDataHelper.readByteStringNullTerminated(buffer);
-
-            HttpStatus status = new HttpStatus(statusCode, reasonPhrase);
-            msg = new HttpResponse(status);
+            msg = decodeResponse(buffer);
         }
 
-        msg.setVersion(decodeVersion(buffer));
-        msg.setHeaders(decodeHeaders(buffer));
         return msg;
     }
 
     public HttpReqResp decodeMessages(ByteBuff buffer) throws IOException {
+        return decodeMessages(buffer, false, false);
+    }
+
+    public HttpReqResp decodeMessages(ByteBuff buffer, boolean skipRequest, boolean skipResponse) throws IOException {
         byte flags = buffer.read();
         HttpRequest req = null;
         if ((flags & FLAG_REQUEST) != 0) {
-
+            if (skipRequest) {
+                skipRequest(buffer);
+            } else {
+                req = decodeFullRequest(buffer);
+            }
         }
 
         HttpResponse resp = null;
         if ((flags & FLAG_RESPONSE) != 0) {
-
+            if (skipResponse) {
+                skipResponse(buffer);
+            } else {
+                resp = decodeFullResponse(buffer);
+            }
         }
 
         return new HttpReqResp(req, resp);
     }
 
     public HttpSessionPayloadEvent read(NcDataBlockReader reader) throws IOException {
-
         return null;
     }
 
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java	Tue Nov 28 14:14:01 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java	Tue Nov 28 14:44:14 2017 +0100
@@ -166,21 +166,15 @@
         if (respContent != null) {
             writer.writeSessionPayloadData(respContent);
         }
-
+        writer.closeSessionPayloadBlock();
+        
         size++;
     }
 
     public void write(HttpSessionPayloadEvent event, NcDataBlockWriter writer) throws IOException {
         long time = event.getTimestamp();
         SessionInfo session = event.getSessionInfo();
-
-        if (event.getRequest() != null) {
-            encodeFullMessage(time, session, event.getRequest(), writer);
-        }
-
-        if (event.getResponse() != null) {
-            encodeFullMessage(time, session, event.getResponse(), writer);
-        }
+        encodeFullMessages(time, session, event.getPayload(), writer);
     }
 
 }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java	Tue Nov 28 14:14:01 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java	Tue Nov 28 14:44:14 2017 +0100
@@ -52,6 +52,17 @@
         return buffer.toByteString(startIndex, startIndex + len);
     }
 
+    public void skipByteStringNullTerminated(ByteBuff buffer) {
+        skipByteStringTerminated(buffer, AsciiUtils.NUL);
+    }
+
+    public void skipByteStringTerminated(ByteBuff buffer, byte delim) {
+        int endIndex = buffer.indexOf(delim);
+        if (endIndex != -1) {
+            buffer.skipBytes(endIndex + 1);
+        }
+    }
+
     public ByteString readByteStringNullTerminated(ByteBuff buffer) {
         return readByteStringTerminated(buffer, AsciiUtils.NUL);
     }
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Tue Nov 28 14:14:01 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Tue Nov 28 14:44:14 2017 +0100
@@ -7,12 +7,17 @@
 import com.passus.data.ByteBuffDataSource;
 import com.passus.st.client.EventHandler;
 import com.passus.st.client.SessionStatusEvent;
+import com.passus.st.client.http.HttpReqResp;
+import com.passus.st.client.http.HttpSessionPayloadEvent;
+import com.passus.st.emitter.SessionInfo;
 import com.passus.st.plugin.PluginConstants;
 import com.passus.st.reader.nc.HttpSessionPayloadEventDataReader;
 import com.passus.st.reader.nc.NcDataBlockReader;
 import com.passus.st.reader.nc.NcSessionPayloadBlock;
 import com.passus.st.reader.nc.NcSessionStatusBlock;
+import java.io.File;
 import java.io.IOException;
+import java.util.Objects;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -35,12 +40,23 @@
 
     private EventHandler handler;
 
-    private HttpSessionPayloadEventDataReader httpReader;
+    private HttpSessionPayloadEventDataReader httpReader = new HttpSessionPayloadEventDataReader();
 
     private String name;
 
     private ReaderThread readerThread;
 
+    public NcEventSource() {
+    }
+
+    public NcEventSource(String ncFile) {
+        this.ncFile = ncFile;
+    }
+
+    public NcEventSource(File file) {
+        this.ncFile = file.getAbsolutePath();
+    }
+
     @Override
     public String getType() {
         return TYPE;
@@ -81,6 +97,10 @@
         try {
             reader = new NcDataBlockReader(ncFile);
             reader.open();
+
+            readerThread = new ReaderThread();
+            readerThread.start();
+
             started = true;
         } catch (Exception e) {
             stop0();
@@ -111,17 +131,29 @@
                 readerThread.join(5_000);
             } catch (InterruptedException ignore) {
             }
+
+            readerThread = null;
         }
 
         started = false;
     }
 
+    public void join() throws InterruptedException {
+        if (readerThread != null) {
+            readerThread.join();
+        }
+    }
+
     private ByteBuff readPayload(Object data) {
-        if (data instanceof ByteBuffDataSource) {
+        if (data == null) {
+            return null;
+        } else if (data instanceof ByteBuff) {
+            return (ByteBuff) data;
+        } else if (data instanceof ByteBuffDataSource) {
             ByteBuffDataSource ds = (ByteBuffDataSource) data;
             return ds.getByteBuffer();
         } else {
-            throw new IllegalArgumentException("Not supported data object.");
+            throw new IllegalArgumentException("Not supported data object '" + data.getClass() + "'.");
         }
     }
 
@@ -143,10 +175,10 @@
                             break;
                         case NcSessionPayloadBlock.TYPE:
                             NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) reader.read();
+                            SessionInfo sessionInfo = payloadBlock.sessionInfo();
                             ByteBuff payload = readPayload(payloadBlock.data());
-
-                            /*HttpMessage msg = httpReader.readMessage(payload);
-                    handler.handle(event);*/
+                            HttpReqResp messages = httpReader.decodeMessages(payload);
+                            handler.handle(new HttpSessionPayloadEvent(sessionInfo, messages, sessionInfo.getSourceName()));
                             break;
                         default:
                             reader.read();
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReaderTest.java	Tue Nov 28 14:14:01 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReaderTest.java	Tue Nov 28 14:44:14 2017 +0100
@@ -7,9 +7,10 @@
 import com.passus.net.http.HttpRequestBuilder;
 import com.passus.net.http.HttpResponse;
 import com.passus.net.http.HttpResponseBuilder;
-import java.io.IOException;
 import org.testng.annotations.Test;
 import static com.passus.st.utils.HttpMessageAssert.*;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 
 /**
  *
@@ -17,18 +18,37 @@
  */
 public class HttpSessionPayloadEventDataReaderTest {
 
-    private void encodeMessage(HttpMessage msg, ByteBuff buffer) throws IOException {
-        HttpSessionPayloadEventDataWriter writer = new HttpSessionPayloadEventDataWriter();
-        writer.encodeMessage(msg, buffer);
+    private final HttpSessionPayloadEventDataWriter writer = new HttpSessionPayloadEventDataWriter();
+
+    private ByteBuff buffer = new HeapByteBuff();
+
+    private final HttpRequest req;
+
+    private final HttpResponse resp;
+
+    public HttpSessionPayloadEventDataReaderTest() {
+        req = HttpRequestBuilder.get("http://test.com/test")
+                .header("X-Header", "X-Header-Value")
+                .build();
+
+        resp = HttpResponseBuilder.ok()
+                .header("X-Header", "X-Header-Value")
+                .build();
+    }
+
+    @AfterMethod
+    public void afterMethod() {
+        buffer.clear();
+    }
+
+    @AfterClass
+    public void afterClass() {
+        buffer = null;
     }
 
     @Test
-    public void testDecodeMessage_Request() throws Exception {
-        ByteBuff buffer = new HeapByteBuff();
-        HttpRequest req = HttpRequestBuilder.get("http://test.com/test")
-                .header("X-Header", "X-Header-Value")
-                .build();
-        encodeMessage(req, buffer);
+    public void testDecode_Request() throws Exception {
+        writer.encodeMessage(req, buffer);
 
         HttpSessionPayloadEventDataReader reader = new HttpSessionPayloadEventDataReader();
         HttpMessage msgDecoded = reader.decodeMessage(buffer);
@@ -36,15 +56,16 @@
     }
 
     @Test
-    public void testDecodeMessage_Response() throws Exception {
-        ByteBuff buffer = new HeapByteBuff();
-        HttpResponse resp = HttpResponseBuilder.ok()
-                .header("X-Header", "X-Header-Value")
-                .build();
-        encodeMessage(resp, buffer);
+    public void testDecode_Response() throws Exception {
+        writer.encodeMessage(resp, buffer);
 
         HttpSessionPayloadEventDataReader reader = new HttpSessionPayloadEventDataReader();
         HttpMessage msgDecoded = reader.decodeMessage(buffer);
         assertMessages(resp, msgDecoded);
     }
+
+    @Test
+    public void testDecode_Messages() throws Exception {
+        //writer.encodeFullMessages(req, resp, buffer);
+    }
 }
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java	Tue Nov 28 14:14:01 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java	Tue Nov 28 14:44:14 2017 +0100
@@ -47,20 +47,19 @@
             eventDst.stop();
 
             List<NcDataBlock> blocks = NcDataBlockReaderUtils.readAll(tmpFile);
-            assertEquals(6, blocks.size());
+            assertEquals(5, blocks.size());
             assertEquals(NcSegmentBlock.TYPE, blocks.get(0).type());
             assertEquals(NcSessionInfoBlock.TYPE, blocks.get(1).type());
             NcSessionInfoBlock infoBlock = (NcSessionInfoBlock) blocks.get(1);
             assertEquals(sessionInfo, infoBlock.sessionInfo());
 
             assertEquals(NcSessionPayloadBlock.TYPE, blocks.get(2).type());
-            assertEquals(NcSessionPayloadBlock.TYPE, blocks.get(3).type());
-            assertEquals(NcSessionStatusBlock.TYPE, blocks.get(4).type());
-            NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) blocks.get(4);
+            assertEquals(NcSessionStatusBlock.TYPE, blocks.get(3).type());
+            NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) blocks.get(3);
             assertEquals(sessionInfo, statusBlock.sessionInfo());
             assertEquals(statusEvent.getStatus(), statusBlock.status());
 
-            assertEquals(NcSegmentBlock.TYPE, blocks.get(5).type());
+            assertEquals(NcSegmentBlock.TYPE, blocks.get(4).type());
         } finally {
             tmpFile.delete();
         }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Tue Nov 28 14:44:14 2017 +0100
@@ -0,0 +1,90 @@
+package com.passus.st.source;
+
+import static com.passus.commons.utils.ResourceUtils.createTmpFile;
+import com.passus.st.client.ArrayListEventHandler;
+import com.passus.st.client.DataEvents;
+import com.passus.st.client.Event;
+import com.passus.st.client.http.HttpSessionPayloadEvent;
+import com.passus.st.utils.EventUtils;
+import static com.passus.st.utils.HttpMessageAssert.assertMessages;
+import static com.passus.st.utils.HttpMessageAssert.assertMessagesContent;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import static org.testng.AssertJUnit.*;
+import org.testng.annotations.Test;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class NcEventSourceTest {
+
+    private FileEvents writeEvents(String pcapFile) throws IOException {
+        Properties props = new Properties();
+        props.put("allowPartialSession", "true");
+        props.put("ports", "4214");
+        List<Event> events = EventUtils.readEvents(pcapFile, props);
+
+        File tmpFile = createTmpFile(false);
+        NcEventDestination eventDst = new NcEventDestination(tmpFile);
+        eventDst.start();
+        events.forEach(eventDst::handle);
+        eventDst.stop();
+
+        events = events.stream().filter((event) -> {
+            return (event.getType() != DataEvents.DataEnd.TYPE
+                    && event.getType() != DataEvents.DataLoopEnd.TYPE);
+        }).collect(Collectors.toList());
+
+        return new FileEvents(tmpFile, events);
+    }
+
+    @Test
+    public void testRead() throws Exception {
+        FileEvents fileEvents = writeEvents("pcap/http/http_req_resp.pcap");
+        try {
+            ArrayListEventHandler handler = new ArrayListEventHandler();
+            NcEventSource eventSource = new NcEventSource(fileEvents.ncFile);
+            eventSource.setHandler(handler);
+            eventSource.start();
+            eventSource.join();
+
+            List<Event> events = handler.getEvents();
+            assertEquals(fileEvents.events.size(), events.size());
+            for (int i = 0; i < events.size(); i++) {
+                Event expectedEvent = fileEvents.events.get(i);
+                Event event = events.get(i);
+
+                assertEquals(expectedEvent.getType(), event.getType());
+                if (event.getType() == HttpSessionPayloadEvent.TYPE) {
+                    HttpSessionPayloadEvent expectedPayloadEvent = (HttpSessionPayloadEvent) expectedEvent;
+                    HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event;
+
+                    assertMessages(expectedPayloadEvent.getRequest(), payloadEvent.getRequest());
+                    assertMessagesContent(expectedPayloadEvent.getRequest(), payloadEvent.getRequest());
+                    assertMessages(expectedPayloadEvent.getResponse(), payloadEvent.getResponse());
+                    assertMessagesContent(expectedPayloadEvent.getResponse(), payloadEvent.getResponse());
+                }
+            }
+
+        } finally {
+            fileEvents.ncFile.delete();
+        }
+    }
+
+    private class FileEvents {
+
+        public final File ncFile;
+
+        public final List<Event> events;
+
+        public FileEvents(File ncFile, List<Event> events) {
+            this.ncFile = ncFile;
+            this.events = events;
+        }
+
+    }
+}
--- a/stress-tester/src/test/java/com/passus/st/utils/HttpMessageAssert.java	Tue Nov 28 14:14:01 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/utils/HttpMessageAssert.java	Tue Nov 28 14:44:14 2017 +0100
@@ -68,7 +68,7 @@
         }
 
         assertEquals(expectedResp.getStatus().getCode(), resp.getStatus().getCode());
-        assertEquals(expectedResp.getStatus().getReasonPhrase(), resp.getStatus().getReasonPhrase());
+        assertEquals(expectedResp.getStatus().getReasonPhrase().toString(), resp.getStatus().getReasonPhrase().toString());
         assertEquals(expectedResp.getVersion(), resp.getVersion());
         assertHeaders(expectedResp.getHeaders(), resp.getHeaders());
     }