changeset 698:f248f2e25d10

NC file in progress
author Devel 2
date Mon, 27 Nov 2017 14:23:02 +0100
parents 86e9b9d85b7f
children c23df9582e50
files stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java stress-tester/src/main/java/com/passus/st/source/NcEventSource.java stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java stress-tester/src/test/java/com/passus/st/utils/HttpMessageAssert.java stress-tester/src/test/java/com/passus/st/utils/NcDataBlockReaderUtils.java
diffstat 10 files changed, 326 insertions(+), 14 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java	Mon Nov 27 13:33:13 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java	Mon Nov 27 14:23:02 2017 +0100
@@ -87,7 +87,7 @@
         return size;
     }
 
-    private void encodeFullMessage(long timestamp, SessionInfo session, HttpMessage msg, NcDataBlockWriter writer) throws IOException {
+    public void encodeFullMessage(long timestamp, SessionInfo session, HttpMessage msg, NcDataBlockWriter writer) throws IOException {
         ByteBuff buffer = new HeapByteBuff();
         long size = encodeMessage(msg, buffer);
         DataSource content = msg.getContent();
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java	Mon Nov 27 13:33:13 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java	Mon Nov 27 14:23:02 2017 +0100
@@ -348,11 +348,11 @@
     @Override
     public NcDataBlock read() throws IOException {
         checkOpened();
+        read(1);
         if (eof && buffer.isEmpty()) {
             return null;
         }
 
-        read(1);
         byte blockType = buffer.read();
         switch (blockType) {
             case NcSegmentBlock.TYPE:
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Mon Nov 27 13:33:13 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Mon Nov 27 14:23:02 2017 +0100
@@ -374,7 +374,7 @@
 
         ByteBuffer dataBuffer = wrapData(data);
         int dataSize = dataBuffer.remaining();
-        write(dataBuffer);
+        ch.write(dataBuffer);
         updateSegmentInfo(dataSize);
     }
 
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java	Mon Nov 27 13:33:13 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java	Mon Nov 27 14:23:02 2017 +0100
@@ -12,6 +12,7 @@
 import com.passus.st.client.http.HttpSessionPayloadEvent;
 import com.passus.st.reader.nc.HttpSessionPayloadEventDataWriter;
 import com.passus.st.reader.nc.NcDataBlockWriter;
+import java.io.File;
 import java.io.IOException;
 
 /**
@@ -30,15 +31,42 @@
 
     private final HttpResponseEncoder responseEncoder = HttpResponseEncoder.getInstance();
 
-    private HttpSessionPayloadEventDataWriter httpPayloadWriter = new HttpSessionPayloadEventDataWriter();
+    private final HttpSessionPayloadEventDataWriter httpPayloadWriter = new HttpSessionPayloadEventDataWriter();
 
-    private boolean encodeData;
+    private boolean encodeData = true;
+
+    public NcEventDestination() {
+    }
+
+    public NcEventDestination(String ncFile) {
+        this.ncFile = ncFile;
+    }
+
+    public NcEventDestination(File file) {
+        this(file.getAbsolutePath());
+    }
+
+    public String getNcFile() {
+        return ncFile;
+    }
+
+    public void setNcFile(String ncFile) {
+        this.ncFile = ncFile;
+    }
 
     @Override
     public boolean isStarted() {
         return started;
     }
 
+    public boolean isEncodeData() {
+        return encodeData;
+    }
+
+    public void setEncodeData(boolean encodeData) {
+        this.encodeData = encodeData;
+    }
+
     @Override
     public void start() {
         if (started) {
@@ -117,17 +145,12 @@
                 case HttpSessionPayloadEvent.TYPE:
                     HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event;
 
-                    if (payloadEvent.getRequest() != null) {
-                        ByteBuff buff = new HeapByteBuff();
-                        requestEncoder.encode(payloadEvent.getRequest(), buff);
-                        writer.writeSessionPayload(payloadEvent.getTimestamp(), payloadEvent.getSessionInfo(), (byte) 1, buff);
+                    if (encodeData) {
+                        writeEncoded(payloadEvent);
+                    } else {
+                        writeNotEncoded(payloadEvent);
                     }
 
-                    if (payloadEvent.getResponse() != null) {
-                        ByteBuff buff = new HeapByteBuff();
-                        responseEncoder.encode(payloadEvent.getResponse(), buff);
-                        writer.writeSessionPayload(payloadEvent.getTimestamp(), payloadEvent.getSessionInfo(), (byte) 1, buff);
-                    }
                     break;
                 case DataEnd.TYPE:
                     stop();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Mon Nov 27 14:23:02 2017 +0100
@@ -0,0 +1,137 @@
+package com.passus.st.source;
+
+import com.passus.commons.Assert;
+import com.passus.commons.annotations.Plugin;
+import com.passus.commons.service.ServiceException;
+import com.passus.data.ByteBuff;
+import com.passus.data.ByteBuffDataSource;
+import com.passus.net.http.HttpRequest;
+import com.passus.net.session.SessionKey;
+import com.passus.st.client.EventHandler;
+import com.passus.st.client.SessionStatusEvent;
+import com.passus.st.plugin.PluginConstants;
+import com.passus.st.reader.nc.HttpSessionPayloadEventDataReader;
+import com.passus.st.reader.nc.NcDataBlockReader;
+import com.passus.st.reader.nc.NcSessionPayloadBlock;
+import com.passus.st.reader.nc.NcSessionStatusBlock;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+@Plugin(name = NcEventSource.TYPE, category = PluginConstants.CATEGORY_EVENT_SOURCE)
+public class NcEventSource implements EventSource {
+
+    public static final String TYPE = "nc";
+
+    private String ncFile;
+
+    private NcDataBlockReader reader;
+
+    private volatile boolean started;
+
+    private EventHandler handler;
+
+    private HttpSessionPayloadEventDataReader httpReader;
+
+    private String name;
+
+    @Override
+    public String getType() {
+        return TYPE;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public EventHandler getHandler() {
+        return handler;
+    }
+
+    @Override
+    public void setHandler(EventHandler handler) {
+        Assert.notNull(handler, "handler");
+        this.handler = handler;
+    }
+
+    @Override
+    public boolean isStarted() {
+        return started;
+    }
+
+    @Override
+    public void start() {
+        if (started) {
+            return;
+        }
+
+        try {
+            reader = new NcDataBlockReader(ncFile);
+            reader.open();
+            started = true;
+        } catch (Exception e) {
+            stop0();
+            throw new ServiceException("Unable to start NcEventDestination. " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (started) {
+            stop0();
+        }
+    }
+
+    private void stop0() {
+        if (reader != null) {
+            try {
+                reader.close();
+            } catch (Exception ignore) {
+            }
+        }
+
+        started = false;
+    }
+
+    private ByteBuff readPayload(Object data) {
+        if (data instanceof ByteBuffDataSource) {
+            ByteBuffDataSource ds = (ByteBuffDataSource) data;
+            return ds.getByteBuffer();
+        } else {
+            throw new IllegalArgumentException("Not supported data object.");
+        }
+    }
+
+    private void read() throws IOException {
+        while (!reader.eof()) {
+            byte blockType = reader.peekBlockType();
+            switch (blockType) {
+                case NcSessionStatusBlock.TYPE:
+                    NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) reader.read();
+                    SessionStatusEvent event = new SessionStatusEvent(statusBlock.sessionInfo(), statusBlock.status());
+                    handler.handle(event);
+                    break;
+                case NcSessionPayloadBlock.TYPE:
+                    NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) reader.read();
+                    ByteBuff payload = readPayload(payloadBlock.data());
+
+                    /*HttpMessage msg = httpReader.readMessage(payload);
+                    handler.handle(event);*/
+                    break;
+                default:
+                    reader.read();
+            }
+        }
+    }
+
+}
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java	Mon Nov 27 13:33:13 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java	Mon Nov 27 14:23:02 2017 +0100
@@ -1,5 +1,7 @@
 package com.passus.st.reader.nc;
 
+import com.passus.data.HeapByteBuff;
+import com.passus.net.http.HttpHeaders;
 import com.passus.net.http.HttpRequest;
 import com.passus.net.http.HttpRequestBuilder;
 import com.passus.st.client.http.HttpSessionPayloadEvent;
@@ -38,6 +40,18 @@
     }
 
     @Test
+    public void testEncodeHeaders() throws Exception {
+        HttpHeaders headers = new HttpHeaders();
+        headers.add(HttpHeaders.HOST, "test");
+        headers.add("X-HEADER", "X-HEADER-VALUE");
+        HeapByteBuff buffer = new HeapByteBuff();
+        
+        HttpSessionPayloadEventDataWriter payloadWriter = new HttpSessionPayloadEventDataWriter();
+        payloadWriter.encodeHeaders(headers, buffer);
+        assertEquals(33, buffer.readableBytes());
+    }
+    
+    @Test
     public void testWrite_HttpRequest() throws Exception {
         File file = createTmpFile();
         try {
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java	Mon Nov 27 13:33:13 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java	Mon Nov 27 14:23:02 2017 +0100
@@ -23,6 +23,7 @@
 
     public NcDataBlockReaderTest() throws ParseException {
         session = new SessionInfo("1.1.1.1:5000", "2.2.2.2:80");
+        session.setSourceName("test");
     }
 
     private File createTmpFile() throws IOException {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java	Mon Nov 27 14:23:02 2017 +0100
@@ -0,0 +1,67 @@
+package com.passus.st.source;
+
+import static com.passus.commons.utils.ResourceUtils.createTmpFile;
+import com.passus.st.client.Event;
+import com.passus.st.client.SessionStatusEvent;
+import com.passus.st.client.http.HttpSessionPayloadEvent;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.reader.nc.NcDataBlock;
+import com.passus.st.reader.nc.NcSegmentBlock;
+import com.passus.st.reader.nc.NcSessionInfoBlock;
+import com.passus.st.reader.nc.NcSessionPayloadBlock;
+import com.passus.st.reader.nc.NcSessionStatusBlock;
+import com.passus.st.utils.EventUtils;
+import com.passus.st.utils.NcDataBlockReaderUtils;
+import java.io.File;
+import java.util.List;
+import java.util.Properties;
+import static org.testng.AssertJUnit.assertEquals;
+import org.testng.annotations.Test;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class NcEventDestinationTest {
+
+    @Test
+    public void testHandle() throws Exception {
+        Properties props = new Properties();
+        props.put("allowPartialSession", "true");
+        props.put("ports", "4214");
+        List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props);
+        assertEquals(4, events.size());
+
+        HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) events.get(0);
+        SessionStatusEvent statusEvent = (SessionStatusEvent) events.get(1);
+        SessionInfo sessionInfo = payloadEvent.getSessionInfo();
+
+        File tmpFile = createTmpFile(false);
+        NcEventDestination eventDst = null;
+        try {
+            eventDst = new NcEventDestination(tmpFile);
+            eventDst.start();
+            events.forEach(eventDst::handle);
+            eventDst.stop();
+
+            List<NcDataBlock> blocks = NcDataBlockReaderUtils.readAll(tmpFile);
+            assertEquals(6, blocks.size());
+            assertEquals(NcSegmentBlock.TYPE, blocks.get(0).type());
+            assertEquals(NcSessionInfoBlock.TYPE, blocks.get(1).type());
+            NcSessionInfoBlock infoBlock = (NcSessionInfoBlock) blocks.get(1);
+            assertEquals(sessionInfo, infoBlock.sessionInfo());
+
+            assertEquals(NcSessionPayloadBlock.TYPE, blocks.get(2).type());
+            assertEquals(NcSessionPayloadBlock.TYPE, blocks.get(3).type());
+            assertEquals(NcSessionStatusBlock.TYPE, blocks.get(4).type());
+            NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) blocks.get(4);
+            assertEquals(sessionInfo, statusBlock.sessionInfo());
+            assertEquals(statusEvent.getStatus(), statusBlock.status());
+
+            assertEquals(NcSegmentBlock.TYPE, blocks.get(5).type());
+        } finally {
+            tmpFile.delete();
+        }
+    }
+
+}
--- a/stress-tester/src/test/java/com/passus/st/utils/HttpMessageAssert.java	Mon Nov 27 13:33:13 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/utils/HttpMessageAssert.java	Mon Nov 27 14:23:02 2017 +0100
@@ -1,6 +1,8 @@
 package com.passus.st.utils;
 
+import com.passus.data.ByteBuff;
 import com.passus.data.ByteString;
+import com.passus.data.HeapByteBuff;
 import com.passus.net.http.HttpHeaders;
 import com.passus.net.http.HttpMessage;
 import com.passus.net.http.HttpRequest;
@@ -32,6 +34,23 @@
         }
     }
 
+    public static void assertMessagesContent(HttpMessage expectedMsg, HttpMessage msg) {
+        if (expectedMsg == msg) {
+            return;
+        }
+
+        try {
+            ByteBuff content1 = new HeapByteBuff();
+            ByteBuff content2 = new HeapByteBuff();
+            expectedMsg.getContent().write(content1);
+            msg.getContent().write(content2);
+            assertEquals(content1, content2);
+        } catch (Exception e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+
+    }
+
     public static void assertRequests(HttpRequest expectedReq, HttpRequest req) {
         if (expectedReq == req) {
             return;
@@ -74,4 +93,5 @@
             assertEquals(values2, values1);
         }
     }
+
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/utils/NcDataBlockReaderUtils.java	Mon Nov 27 14:23:02 2017 +0100
@@ -0,0 +1,50 @@
+package com.passus.st.utils;
+
+import com.passus.st.reader.nc.NcDataBlock;
+import com.passus.st.reader.nc.NcDataBlockReader;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Predicate;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class NcDataBlockReaderUtils {
+
+    private NcDataBlockReaderUtils() {
+    }
+
+    public static List<NcDataBlock> readAll(File file) throws IOException {
+        return readAll(file.getAbsolutePath(), null);
+    }
+
+    public static List<NcDataBlock> readAll(String fileName) throws IOException {
+        return readAll(fileName, null);
+    }
+
+    public static List<NcDataBlock> readAll(File file, Predicate<NcDataBlock> filter) throws IOException {
+        return readAll(file.getAbsolutePath(), filter);
+    }
+
+    public static List<NcDataBlock> readAll(String fileName, Predicate<NcDataBlock> filter) throws IOException {
+        List<NcDataBlock> blocks = new ArrayList<>();
+        try (NcDataBlockReader reader = new NcDataBlockReader(fileName)) {
+            reader.open();
+
+            NcDataBlock block;
+            while ((block = reader.read()) != null) {
+                if (filter != null && !filter.test(block)) {
+                    continue;
+                }
+
+                blocks.add(block);
+            }
+        }
+
+        return blocks;
+    }
+
+}