changeset 700:7955b2096163

NC file in progress
author Devel 2
date Tue, 28 Nov 2017 11:46:55 +0100
parents c23df9582e50
children be2f53a20274
files stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlock.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpDataUtils.java stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionBlock.java stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionPayloadBlock.java stress-tester/src/main/java/com/passus/st/reader/nc/option/OptionsReaderWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/option/ValueCoders.java stress-tester/src/main/java/com/passus/st/reader/nc/option/VarOptionCoder.java stress-tester/src/main/java/com/passus/st/source/NcEventSource.java stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReaderTest.java stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java stress-tester/src/test/java/com/passus/st/reader/nc/NcDataHelperTest.java
diffstat 18 files changed, 282 insertions(+), 180 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java	Tue Nov 28 11:46:55 2017 +0100
@@ -3,7 +3,7 @@
 import com.passus.net.http.HttpMessageHelper;
 import com.passus.net.http.HttpRequest;
 import com.passus.net.http.HttpResponse;
-import com.passus.st.client.http.HttpSessionPayloadEvent.HttpReqResp;
+import com.passus.st.client.http.HttpReqResp;
 import com.passus.st.client.SessionPayloadEvent;
 import com.passus.st.emitter.SessionInfo;
 
@@ -43,26 +43,6 @@
         return new HttpSessionPayloadEvent(getSessionInfo(), reqCopy, payload.response, getSourceName());
     }
 
-    public static class HttpReqResp {
-
-        private final HttpRequest request;
-
-        private final HttpResponse response;
-
-        public HttpReqResp(HttpRequest request, HttpResponse response) {
-            this.request = request;
-            this.response = response;
-        }
-
-        public HttpRequest getRequest() {
-            return request;
-        }
-
-        public HttpResponse getResponse() {
-            return response;
-        }
-
-    }
 
     @Override
     public String toString() {
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java	Tue Nov 28 11:46:55 2017 +0100
@@ -9,8 +9,11 @@
 import com.passus.net.http.HttpRequest;
 import com.passus.net.http.HttpResponse;
 import com.passus.net.http.HttpStatus;
+import com.passus.st.client.http.HttpReqResp;
+import com.passus.st.client.http.HttpSessionPayloadEvent;
 import static com.passus.st.reader.nc.NcHttpDataUtils.CUSTOM_HEADER_CODE;
 import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_REQUEST;
+import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_RESPONSE;
 import java.io.IOException;
 
 /**
@@ -23,8 +26,8 @@
 
     private final DataHelper dataHelper = DataHelper.BIG_ENDIAN;
 
-    private ByteString decodeVersion(ByteBuff buff) throws IOException {
-        byte b = buff.read();
+    public ByteString decodeVersion(ByteBuff buffer) throws IOException {
+        byte b = buffer.read();
         if (b == NcHttpDataUtils.VERSION_1_0) {
             return HttpConsts.VERSION_1_0;
         } else if (b == NcHttpDataUtils.VERSION_1_1) {
@@ -34,7 +37,7 @@
         throw new IOException("Not supported HTTP version '" + b + "'.");
     }
 
-    private HttpHeaders decodeHeaders(ByteBuff buffer) throws IOException {
+    public HttpHeaders decodeHeaders(ByteBuff buffer) throws IOException {
         long headerSize = ncDataHelper.readLongVLC(buffer);
         int startIndex = buffer.startIndex();
 
@@ -62,6 +65,19 @@
         return headers;
     }
 
+    public HttpRequest decodeRequest(ByteBuff buffer) throws IOException {
+        ByteString method = ncDataHelper.readByteStringNullTerminated(buffer);
+        ByteString uri = ncDataHelper.readByteStringNullTerminated(buffer);
+        return new HttpRequest(uri, method);
+    }
+
+    public HttpResponse decodeResponse(ByteBuff buffer) throws IOException {
+        int statusCode = dataHelper.readInt2(buffer);
+        ByteString reasonPhrase = ncDataHelper.readByteStringNullTerminated(buffer);
+        HttpStatus status = new HttpStatus(statusCode, reasonPhrase);
+        return new HttpResponse(status);
+    }
+
     public HttpMessage decodeMessage(ByteBuff buffer) throws IOException {
         byte flags = buffer.read();
 
@@ -84,7 +100,23 @@
         return msg;
     }
 
-    private HttpMessage read(NcDataBlockReader reader) throws IOException {
+    public HttpReqResp decodeMessages(ByteBuff buffer) throws IOException {
+        byte flags = buffer.read();
+        HttpRequest req = null;
+        if ((flags & FLAG_REQUEST) != 0) {
+
+        }
+
+        HttpResponse resp = null;
+        if ((flags & FLAG_RESPONSE) != 0) {
+
+        }
+
+        return new HttpReqResp(req, resp);
+    }
+
+    public HttpSessionPayloadEvent read(NcDataBlockReader reader) throws IOException {
+
         return null;
     }
 
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java	Tue Nov 28 11:46:55 2017 +0100
@@ -10,9 +10,11 @@
 import com.passus.net.http.HttpMessage;
 import com.passus.net.http.HttpRequest;
 import com.passus.net.http.HttpResponse;
+import com.passus.st.client.http.HttpReqResp;
 import com.passus.st.client.http.HttpSessionPayloadEvent;
 import com.passus.st.emitter.SessionInfo;
 import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_REQUEST;
+import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_RESPONSE;
 import java.io.IOException;
 import java.nio.ByteOrder;
 
@@ -32,21 +34,21 @@
             int code = NcHttpDataUtils.headerToCode(entry.getName());
             if (code == -1) {
                 headersBuff.append(NcHttpDataUtils.CUSTOM_HEADER_CODE);
-                ncDataHelper.writeByteStringNullTerminated(entry.getName(), headersBuff);
+                ncDataHelper.writeByteStringNullTerminated(headersBuff, entry.getName());
             } else {
                 headersBuff.append((byte) code);
             }
 
-            ncDataHelper.writeByteStringNullTerminated(entry.getValue(), headersBuff);
+            ncDataHelper.writeByteStringNullTerminated(headersBuff, entry.getValue());
         }
 
         int len = headersBuff.readableBytes();
-        len += ncDataHelper.writeLongVLC(headersBuff.readableBytes(), buff);
+        len += ncDataHelper.writeLongVLC(buff, headersBuff.readableBytes());
         buff.append(headersBuff);
         return len;
     }
 
-    public void encodeVersion(HttpMessage msg, ByteBuff buff) throws IOException {
+    public long encodeVersion(HttpMessage msg, ByteBuff buff) throws IOException {
         if (HttpConsts.VERSION_1_0.equals(msg.getVersion())) {
             buff.append(NcHttpDataUtils.VERSION_1_0);
         } else if (HttpConsts.VERSION_1_1.equals(msg.getVersion())) {
@@ -54,36 +56,55 @@
         } else {
             throw new IOException("Not supported HTTP version '" + msg.getVersion() + "'.");
         }
+
+        return 1;
+    }
+
+    public long encodeRequest(HttpRequest req, ByteBuff buff) throws IOException {
+        long size = 0;
+        size += ncDataHelper.writeByteStringNullTerminated(buff, req.getMethod().toByteString());
+        size += ncDataHelper.writeByteStringNullTerminated(buff, req.getUri());
+        size += encodeVersion(req, buff);
+        size += encodeHeaders(req.getHeaders(), buff);
+        return size;
+    }
+
+    public long encodeResponse(HttpResponse resp, ByteBuff buff) throws IOException {
+        long size = 0;
+        dataHelper.writeInt2(buff, resp.getStatus().getCode());
+        size += 2;
+        size += ncDataHelper.writeByteStringNullTerminated(buff, resp.getStatus().getReasonPhrase());
+        size += encodeVersion(resp, buff);
+        size += encodeHeaders(resp.getHeaders(), buff);
+        return size;
     }
 
     public long encodeMessage(HttpMessage msg, ByteBuff buff) throws IOException {
+        return encodeMessage(msg, buff, true);
+    }
+
+    public long encodeMessage(HttpMessage msg, ByteBuff buff, boolean appendFlags) throws IOException {
         long size = 0;
-        byte flags = 0;
         if (msg.isRequest()) {
-            flags = FLAG_REQUEST;
             HttpRequest req = (HttpRequest) msg;
 
-            buff.append(flags);
-            size++;
-            size += ncDataHelper.writeByteStringNullTerminated(req.getMethod().toByteString(), buff);
-            size += ncDataHelper.writeByteStringNullTerminated(req.getUri(), buff);
+            if (appendFlags) {
+                buff.append(FLAG_REQUEST);
+                size++;
+            }
 
-            encodeVersion(msg, buff);
-            size++;
+            size += encodeRequest(req, buff);
         } else {
             HttpResponse resp = (HttpResponse) msg;
 
-            buff.append(flags);
-            size++;
-            dataHelper.writeInt2(buff, resp.getStatus().getCode());
-            size += 2;
-            size += ncDataHelper.writeByteStringNullTerminated(resp.getStatus().getReasonPhrase(), buff);
+            if (appendFlags) {
+                buff.append(FLAG_RESPONSE);
+                size++;
+            }
 
-            encodeVersion(msg, buff);
-            size++;
+            size += encodeResponse(resp, buff);
         }
 
-        size += encodeHeaders(msg.getHeaders(), buff);
         return size;
     }
 
@@ -93,6 +114,7 @@
         DataSource content = msg.getContent();
         if (content != null) {
             size += content.available();
+            size += ncDataHelper.writeLongVLC(buffer, content.available());
         }
 
         writer.writeSessionPayloadHeader(timestamp, session, (byte) 1, size);
@@ -104,6 +126,50 @@
         writer.closeSessionPayloadBlock();
     }
 
+    public void encodeFullMessages(long timestamp, SessionInfo session, HttpReqResp messages, NcDataBlockWriter writer) throws IOException {
+        ByteBuff reqBuffer = new HeapByteBuff();
+        ByteBuff respBuffer = new HeapByteBuff();
+        byte flags = 0;
+        long size = 1;
+        HttpRequest req = messages.getRequest();
+        DataSource reqContent = null;
+        if (req != null) {
+            flags |= FLAG_REQUEST;
+            size += encodeRequest(req, reqBuffer);
+            reqContent = req.getContent();
+            if (reqContent != null) {
+                size += reqContent.available();
+                size += ncDataHelper.writeLongVLC(reqBuffer, reqContent.available());
+            }
+        }
+
+        HttpResponse resp = messages.getResponse();
+        DataSource respContent = null;
+        if (resp != null) {
+            flags |= FLAG_RESPONSE;
+            size += encodeResponse(resp, respBuffer);
+            respContent = resp.getContent();
+            if (respContent != null) {
+                size += respContent.available();
+                size += ncDataHelper.writeLongVLC(respBuffer, respContent.available());
+            }
+        }
+
+        writer.writeSessionPayloadHeader(timestamp, session, (byte) 1, size);
+        writer.writeSessionPayloadData(new byte[]{flags});
+        writer.writeSessionPayloadData(reqBuffer);
+        if (reqContent != null) {
+            writer.writeSessionPayloadData(reqContent);
+        }
+
+        writer.writeSessionPayloadData(respBuffer);
+        if (respContent != null) {
+            writer.writeSessionPayloadData(respContent);
+        }
+
+        size++;
+    }
+
     public void write(HttpSessionPayloadEvent event, NcDataBlockWriter writer) throws IOException {
         long time = event.getTimestamp();
         SessionInfo session = event.getSessionInfo();
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlock.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlock.java	Tue Nov 28 11:46:55 2017 +0100
@@ -6,6 +6,8 @@
  */
 public interface NcDataBlock {
 
+    public static final int HEADER_SIZE = 1;
+    
     public byte type();
 
 }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java	Tue Nov 28 11:46:55 2017 +0100
@@ -2,7 +2,6 @@
 
 import com.passus.commons.Assert;
 import com.passus.data.ByteBuff;
-import com.passus.data.DataHelper;
 import com.passus.data.HeapByteBuff;
 import com.passus.net.IpAddress;
 import com.passus.net.session.SessionBean;
@@ -32,8 +31,6 @@
 
     private NcHeader header;
 
-    private final DataHelper dataHelper = DataHelper.BIG_ENDIAN;
-
     private final NcDataHelper ncDataHelper = NcDataHelper.getInstance();
 
     private final Path path;
@@ -216,9 +213,9 @@
             checkBlockType(blockType, NcSegmentBlock.TYPE);
         }
 
-        long totalSize = dataHelper.readLong8(buffer);
-        long prevTotalSize = dataHelper.readLong8(buffer);
-        int blocksNum = dataHelper.readInt2(buffer);
+        long totalSize = ncDataHelper.readLong8(buffer);
+        long prevTotalSize = ncDataHelper.readLong8(buffer);
+        int blocksNum = ncDataHelper.readInt2(buffer);
         return new NcSegmentBlock(totalSize, prevTotalSize, blocksNum);
     }
 
@@ -230,8 +227,8 @@
             checkBlockType(blockType, NcSessionInfoBlock.TYPE);
         }
 
-        int totalSize = dataHelper.readInt4(buffer);
-        int sessionId = dataHelper.readInt4(buffer);
+        int totalSize = ncDataHelper.readInt4(buffer);
+        int sessionId = ncDataHelper.readInt4(buffer);
         String sourceName = ncDataHelper.readStringNullTerminated(buffer);
         byte transport = buffer.read();
         if (transport != SessionBean.PROTOCOL_TCP && transport != SessionBean.PROTOCOL_UDP) {
@@ -239,9 +236,9 @@
         }
 
         IpAddress clientIp = ncDataHelper.readIpAddress(buffer);
-        int clientPort = dataHelper.readInt2(buffer);
+        int clientPort = ncDataHelper.readInt2(buffer);
         IpAddress serverIp = ncDataHelper.readIpAddress(buffer);
-        int serverPort = dataHelper.readInt2(buffer);
+        int serverPort = ncDataHelper.readInt2(buffer);
 
         SessionInfo sessionInfo = new SessionInfo(clientIp, clientPort, serverIp, serverPort, transport);
         sessionInfo.setSourceName(sourceName);
@@ -260,8 +257,8 @@
             checkBlockType(blockType, NcSessionStatusBlock.TYPE);
         }
 
-        long timestamp = dataHelper.readLong8(buffer);
-        int sessionId = dataHelper.readInt4(buffer);
+        long timestamp = ncDataHelper.readLong8(buffer);
+        int sessionId = ncDataHelper.readInt4(buffer);
         SessionInfo session = sessionIdMap.get(sessionId);
         if (session == null) {
             throw new IOException("Invalid session id '" + sessionId + "'.");
@@ -272,25 +269,25 @@
     }
 
     private NcSessionPayloadBlock readSessionPayloadBlockHeader(boolean skipBlockType) throws IOException {
-        read(NcSessionPayloadBlock.MAX_HEADER_SIZE);
+        read(NcSessionPayloadBlock.HEADER_SIZE);
 
         if (!skipBlockType) {
             byte blockType = buffer.read();
             checkBlockType(blockType, NcSessionPayloadBlock.TYPE);
         }
 
-        long timestamp = dataHelper.readLong8(buffer);
-        int sessionId = dataHelper.readInt4(buffer);
+        long timestamp = ncDataHelper.readLong8(buffer);
+        int sessionId = ncDataHelper.readInt4(buffer);
         SessionInfo session = sessionIdMap.get(sessionId);
         if (session == null) {
             throw new IOException("Invalid session id '" + sessionId + "'.");
         }
-        //long totalSize = dataHelper.readLong8(buffer);
-        int proto = dataHelper.readInt2(buffer);
-        long dataLength = ncDataHelper.readLongVLC(buffer);
-        long optionsLength = ncDataHelper.readLongVLC(buffer);
 
-        return new NcSessionPayloadBlock(timestamp, sessionId, session, -1, proto, dataLength, optionsLength, null, null);
+        long totalSize = ncDataHelper.readLong6(buffer);
+        long dataSize = ncDataHelper.readLong6(buffer);
+        int proto = ncDataHelper.readInt2(buffer);
+
+        return new NcSessionPayloadBlock(timestamp, sessionId, session, totalSize, dataSize, proto, null, null);
     }
 
     private int readSessionPayloadBlockContent(ByteBuff data, int length) throws IOException {
@@ -311,21 +308,21 @@
         NcSessionPayloadBlock payloadBlock = readSessionPayloadBlockHeader(skipBlockType);
 
         if (!skipData) {
-            if (payloadBlock.dataLength() <= Integer.MAX_VALUE) {
-                int dataLength = (int) payloadBlock.dataLength();
-                ByteBuff data = new HeapByteBuff(dataLength);
-                readSessionPayloadBlockContent(data, dataLength);
+            if (payloadBlock.dataSize() <= Integer.MAX_VALUE) {
+                int dataSize = (int) payloadBlock.dataSize();
+                ByteBuff data = new HeapByteBuff(dataSize);
+                readSessionPayloadBlockContent(data, dataSize);
                 payloadBlock.data(data);
             } else {
                 long startPosition = position();
-                long endPosition = startPosition + payloadBlock.dataLength();
+                long endPosition = startPosition + payloadBlock.dataSize();
                 ReadonlyFileChannelDataSource ds = new ReadonlyFileChannelDataSource(ch, path, startPosition, endPosition);
                 payloadBlock.data(ds);
             }
         }
 
         Map<String, Object> options = null;
-        if (!skipOptions && payloadBlock.optionsLength() != 0) {
+        if (!skipOptions && payloadBlock.optionsSize() != 0) {
             throw new RuntimeException("optionsLength != 0 - not implemented yet");
         }
 
@@ -341,8 +338,7 @@
     }
 
     public int readSessionPayloadContent(ByteBuffer data, long length) {
-
-        return 0;
+        throw new RuntimeException("Not implemented yet.");
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Tue Nov 28 11:46:55 2017 +0100
@@ -3,6 +3,7 @@
 import com.passus.commons.Assert;
 import com.passus.data.ByteBuff;
 import com.passus.data.ByteBuffDataSource;
+import com.passus.data.DataHelper;
 import com.passus.data.DataSource;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.reader.DataBlockWriter;
@@ -283,11 +284,11 @@
         int sizePosition = buffer.position();
         buffer.putInt(0);
         buffer.putInt(sessionId);
-        size += ncDataHelper.writeStringNullTerminated(sessionInfo.getSourceName(), buffer);
+        size += ncDataHelper.writeStringNullTerminated(buffer, sessionInfo.getSourceName());
         buffer.put((byte) sessionInfo.getTransport());
-        size += ncDataHelper.writeIpAddress(sessionInfo.getSrcIp(), buffer);
+        size += ncDataHelper.writeIpAddress(buffer, sessionInfo.getSrcIp());
         buffer.putShort((short) sessionInfo.getSrcPort());
-        size += ncDataHelper.writeIpAddress(sessionInfo.getDstIp(), buffer);
+        size += ncDataHelper.writeIpAddress(buffer, sessionInfo.getDstIp());
         buffer.putShort((short) sessionInfo.getDstPort());
         closeBlock(NcSessionInfoBlock.TYPE, false);
         buffer.putInt(sizePosition, size);
@@ -313,7 +314,7 @@
         updateSegmentInfo(NcSessionStatusBlock.HEADER_SIZE);
     }
 
-    public void writeSessionPayloadHeader(long timestamp, SessionInfo session, int proto, long dataLength) throws IOException {
+    public void writeSessionPayloadHeader(long timestamp, SessionInfo session, int proto, long dataSize) throws IOException {
         checkOpened();
 
         int sessionId = getSessionId(session, opened);
@@ -322,16 +323,11 @@
         buffer.putLong(timestamp);
         buffer.putInt(sessionId);
 
-        //int totalSizePosition = buffer.position();
-        //buffer.putLong(0L);
+        ncDataHelper.writeLong6(buffer, NcSessionPayloadBlock.HEADER_SIZE + dataSize);
+        ncDataHelper.writeLong6(buffer, dataSize);
         buffer.putShort((short) proto);
 
-        int size = 18;
-        size += ncDataHelper.writeLongVLC(dataLength, buffer);
-        size += ncDataHelper.writeLongVLC(0, buffer); // OptionsLength
-        //buffer.putLong(totalSizePosition, size);
-
-        updateSegmentInfo(size);
+        updateSegmentInfo(NcSessionPayloadBlock.HEADER_SIZE);
         writeBuffer();
         currentBlockStage = BLOCK_STAGE_CONTENT_WRITE;
     }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java	Tue Nov 28 11:46:55 2017 +0100
@@ -4,6 +4,7 @@
 import com.passus.data.ByteBuff;
 import com.passus.data.ByteBuffUtils;
 import com.passus.data.ByteString;
+import com.passus.data.DataHelper.BigEndianDataHelper;
 import com.passus.data.DataUtils;
 import com.passus.net.Ip4Address;
 import com.passus.net.Ip6Address;
@@ -17,7 +18,7 @@
  *
  * @author Mirosław Hawrot
  */
-public class NcDataHelper {
+public class NcDataHelper extends BigEndianDataHelper {
 
     public static final byte VERSION_MAJOR = 1;
 
@@ -98,27 +99,27 @@
         return readed;
     }
 
-    public int writeIntVLC(int value, ByteBuff buffer) {
-        int len = writeIntVLC(value, buffer.buffer(), buffer.endIndex(), buffer.capacity());
+    public int writeIntVLC(ByteBuff buffer, int value) {
+        int len = writeIntVLC(buffer.buffer(), buffer.endIndex(), buffer.capacity(), value);
         buffer.endIndex(buffer.endIndex() + len);
         return len;
     }
 
-    public int writeIntVLC(int value, ByteBuffer buffer) {
-        int len = writeIntVLC(value, buffer.array(), buffer.position(), buffer.limit());
+    public int writeIntVLC(ByteBuffer buffer, int value) {
+        int len = writeIntVLC(buffer.array(), buffer.position(), buffer.limit(), value);
         buffer.position(buffer.position() + len);
         return len;
     }
 
-    public int writeIntVLC(int value, byte[] data) {
-        return writeIntVLC(value, data, 0, data.length);
+    public int writeIntVLC(byte[] data, int value) {
+        return writeIntVLC(data, 0, data.length, value);
     }
 
-    public int writeIntVLC(int value, byte[] data, int startIndex) {
-        return writeIntVLC(value, data, startIndex, data.length);
+    public int writeIntVLC(byte[] data, int startIndex, int value) {
+        return writeIntVLC(data, startIndex, data.length, value);
     }
 
-    public int writeIntVLC(int value, byte[] data, int startIndex, int endIndex) {
+    public int writeIntVLC(byte[] data, int startIndex, int endIndex, int value) {
         if (value < VLC_INT_TRESHOLD) {
             data[startIndex] = (byte) value;
             return 1;
@@ -169,27 +170,36 @@
         return readed;
     }
 
-    public int writeLongVLC(long value, ByteBuff buffer) {
-        int len = writeLongVLC(value, buffer.buffer(), buffer.endIndex(), buffer.capacity());
+    public void writeLong6(ByteBuffer buffer, long val) {
+        writeLongN(buffer, 6, val);
+    }
+
+    public void writeLongN(ByteBuffer buffer, int bytes, long val) {
+        writeLongN(buffer.array(), buffer.position(), bytes, val);
+        buffer.position(buffer.position() + bytes);
+    }
+
+    public int writeLongVLC(ByteBuff buffer, long value) {
+        int len = writeLongVLC(buffer.buffer(), buffer.endIndex(), buffer.capacity(), value);
         buffer.endIndex(buffer.endIndex() + len);
         return len;
     }
 
-    public int writeLongVLC(long value, ByteBuffer buffer) {
-        int len = writeLongVLC(value, buffer.array(), buffer.position(), buffer.limit());
+    public int writeLongVLC(ByteBuffer buffer, long value) {
+        int len = writeLongVLC(buffer.array(), buffer.position(), buffer.limit(), value);
         buffer.position(buffer.position() + len);
         return len;
     }
 
-    public int writeLongVLC(long value, byte[] data) {
-        return writeLongVLC(value, data, 0, data.length);
+    public int writeLongVLC(byte[] data, long value) {
+        return writeLongVLC(data, 0, data.length, value);
     }
 
-    public int writeLongVLC(long value, byte[] data, int startIndex) {
-        return writeLongVLC(value, data, startIndex, data.length);
+    public int writeLongVLC(byte[] data, int startIndex, long value) {
+        return writeLongVLC(data, startIndex, data.length, value);
     }
 
-    public int writeLongVLC(long value, byte[] data, int startIndex, int endIndex) {
+    public int writeLongVLC(byte[] data, int startIndex, int endIndex, long value) {
         if (value < VLC_LONG_TRESHOLD) {
             if (endIndex - startIndex < 2) {
                 throw new IndexOutOfBoundsException();
@@ -211,28 +221,28 @@
         }
     }
 
-    public int writeByteStringNullTerminated(ByteString value, ByteBuff buffer) {
-        return writeByteStringNullTerminated(value, buffer, true);
+    public int writeByteStringNullTerminated(ByteBuff buffer, ByteString value) {
+        return writeByteStringNullTerminated(buffer, value, true);
     }
 
-    public int writeByteStringNullTerminated(ByteString value, ByteBuff buffer, boolean sanitize) {
+    public int writeByteStringNullTerminated(ByteBuff buffer, ByteString value, boolean sanitize) {
         buffer.ensureCapacity(value.length());
-        int len = writeByteStringNullTerminated(value, buffer.buffer(), buffer.endIndex(), buffer.capacity(), sanitize);
+        int len = writeByteStringNullTerminated(buffer.buffer(), buffer.endIndex(), buffer.capacity(), value, sanitize);
         buffer.endIndex(buffer.endIndex() + len);
         return len;
     }
 
-    public int writeByteStringNullTerminated(ByteString value, ByteBuffer buffer) {
-        return writeByteStringNullTerminated(value, buffer, true);
+    public int writeByteStringNullTerminated(ByteBuffer buffer, ByteString value) {
+        return writeByteStringNullTerminated(buffer, value, true);
     }
 
-    public int writeByteStringNullTerminated(ByteString value, ByteBuffer buffer, boolean sanitize) {
-        int len = writeByteStringNullTerminated(value, buffer.array(), buffer.position(), buffer.limit(), sanitize);
+    public int writeByteStringNullTerminated(ByteBuffer buffer, ByteString value, boolean sanitize) {
+        int len = writeByteStringNullTerminated(buffer.array(), buffer.position(), buffer.limit(), value, sanitize);
         buffer.limit(buffer.limit() + len);
         return len;
     }
 
-    public int writeByteStringNullTerminated(ByteString value, byte data[], int startIndex, int endIndex, boolean sanitize) {
+    public int writeByteStringNullTerminated(byte data[], int startIndex, int endIndex, ByteString value, boolean sanitize) {
         int len = 0;
         byte[] valData = value.getBytes();
         if (!value.isEmpty()) {
@@ -268,7 +278,7 @@
         throw new IllegalArgumentException("Unable to find NULL delimiter.");
     }
 
-    public int writeStringNullTerminated(String value, ByteBuff buffer) {
+    public int writeStringNullTerminated(ByteBuff buffer, String value) {
         if (value != null && !value.isEmpty()) {
             byte[] data = value.getBytes(CHARSET);
             buffer.append(data);
@@ -280,7 +290,7 @@
         return 1;
     }
 
-    public int writeStringNullTerminated(String value, ByteBuffer buffer) {
+    public int writeStringNullTerminated(ByteBuffer buffer, String value) {
         if (value != null && !value.isEmpty()) {
             byte[] data = value.getBytes(CHARSET);
             buffer.put(data);
@@ -292,7 +302,7 @@
         return 1;
     }
 
-    public int writeIpAddress(IpAddress ip, ByteBuffer buffer) {
+    public int writeIpAddress(ByteBuffer buffer, IpAddress ip) {
         byte[] data = ip.getAddress();
         buffer.put((byte) ip.getVersion());
         buffer.put(data);
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpDataUtils.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpDataUtils.java	Tue Nov 28 11:46:55 2017 +0100
@@ -14,14 +14,16 @@
 
     public static final byte FLAG_REQUEST = (byte) 0b10000000;
 
-    public static final byte FLAG_ENCODED = (byte) 0b01000000;
+    public static final byte FLAG_RESPONSE = (byte) 0b01000000;
 
-    public static final byte FLAG_COMPRESSED = (byte) 0b00100000;
+    public static final byte FLAG_ENCODED = (byte) 0b00100000;
+
+    public static final byte FLAG_COMPRESSED = (byte) 0b00010000;
 
     public static final byte VERSION_1_0 = 1;
-    
+
     public static final byte VERSION_1_1 = 2;
-    
+
     public static final int CUSTOM_HEADER_CODE = 0;
 
     private static final Map<Byte, ByteString> CODE_TO_HEADER;
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionBlock.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionBlock.java	Tue Nov 28 11:46:55 2017 +0100
@@ -8,7 +8,7 @@
  */
 public abstract class NcSessionBlock implements NcDataBlock {
 
-    public static final int HEADER_SIZE = 12;
+    public static final int HEADER_SIZE = 12 + NcDataBlock.HEADER_SIZE;
 
     private long timestamp;
 
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionPayloadBlock.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionPayloadBlock.java	Tue Nov 28 11:46:55 2017 +0100
@@ -11,15 +11,15 @@
 
     public static final byte TYPE = 3;
 
-    public static final int MAX_HEADER_SIZE = NcSessionBlock.HEADER_SIZE + 29;
+    public static final int HEADER_SIZE = NcSessionBlock.HEADER_SIZE + 14;
 
     private long totalSize;
 
-    private int proto;
+    private long dataSize;
 
-    private long dataLength;
+    private long optionsSize;
 
-    private long optionsLength = 0;
+    private int proto;
 
     private Object data;
 
@@ -29,13 +29,13 @@
     }
 
     public NcSessionPayloadBlock(long timestamp, int sessionId, SessionInfo sessionInfo,
-            long totalSize, int proto, long dataLength, long optionsLength,
+            long totalSize, long dataSize, int proto,
             Object data, Map<String, Object> options) {
         super(timestamp, sessionId, sessionInfo);
         this.totalSize = totalSize;
+        this.dataSize = dataSize;
+        this.optionsSize = totalSize - HEADER_SIZE - dataSize;
         this.proto = proto;
-        this.dataLength = dataLength;
-        this.optionsLength = optionsLength;
         this.options = options;
         this.data = data;
     }
@@ -61,20 +61,16 @@
         this.proto = proto;
     }
 
-    public long dataLength() {
-        return dataLength;
+    public long dataSize() {
+        return dataSize;
     }
 
-    public void dataLength(long dataLength) {
-        this.dataLength = dataLength;
+    public void dataSize(long dataLength) {
+        this.dataSize = dataLength;
     }
 
-    public long optionsLength() {
-        return optionsLength;
-    }
-
-    public void optionsLength(long optionsLength) {
-        this.optionsLength = optionsLength;
+    public long optionsSize() {
+        return optionsSize;
     }
 
     public Object data() {
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/option/OptionsReaderWriter.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/option/OptionsReaderWriter.java	Tue Nov 28 11:46:55 2017 +0100
@@ -14,7 +14,7 @@
     public static NcDataHelper NC_HELPER = NcDataHelper.getInstance();
 
     public void writeOptions(List<Option> options, ByteBuff buff) {
-        NC_HELPER.writeIntVLC(options.size(), buff);
+        NC_HELPER.writeIntVLC(buff, options.size());
 
         for (Option option : options) {
             String name = option.getName();
@@ -26,7 +26,7 @@
 
             buff.append(code);
             if (code == 0) {
-                NC_HELPER.writeStringNullTerminated(name, buff);
+                NC_HELPER.writeStringNullTerminated(buff, name);
             }
             coder.encode(option.getValue(), buff);
         }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/option/ValueCoders.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/option/ValueCoders.java	Tue Nov 28 11:46:55 2017 +0100
@@ -31,7 +31,7 @@
 
         @Override
         public void encode(Integer value, ByteBuff buff) {
-            NC_HELPER.writeIntVLC(value, buff);
+            NC_HELPER.writeIntVLC(buff, value);
         }
 
         @Override
@@ -57,7 +57,7 @@
 
         @Override
         public void encode(Long value, ByteBuff buff) {
-            NC_HELPER.writeLongVLC(value, buff);
+            NC_HELPER.writeLongVLC(buff, value);
         }
 
         @Override
@@ -70,7 +70,7 @@
 
         @Override
         public void encode(ByteString value, ByteBuff buff) {
-            NC_HELPER.writeByteStringNullTerminated(value, buff);
+            NC_HELPER.writeByteStringNullTerminated(buff, value);
         }
 
         @Override
@@ -83,7 +83,7 @@
 
         @Override
         public void encode(String value, ByteBuff buff) {
-            NC_HELPER.writeStringNullTerminated(value, buff);
+            NC_HELPER.writeStringNullTerminated(buff, value);
         }
 
         @Override
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/option/VarOptionCoder.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/option/VarOptionCoder.java	Tue Nov 28 11:46:55 2017 +0100
@@ -13,9 +13,9 @@
 
     @Override
     public void encode(VarOption value, ByteBuff buff) {
-        NC_HELPER.writeStringNullTerminated(value.getName(), buff);
-        NC_HELPER.writeLongVLC(value.getStartOffset(), buff);
-        NC_HELPER.writeLongVLC(value.getEndOffset(), buff);
+        NC_HELPER.writeStringNullTerminated(buff, value.getName());
+        NC_HELPER.writeLongVLC(buff, value.getStartOffset());
+        NC_HELPER.writeLongVLC(buff, value.getEndOffset());
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Tue Nov 28 11:46:55 2017 +0100
@@ -5,8 +5,6 @@
 import com.passus.commons.service.ServiceException;
 import com.passus.data.ByteBuff;
 import com.passus.data.ByteBuffDataSource;
-import com.passus.net.http.HttpRequest;
-import com.passus.net.session.SessionKey;
 import com.passus.st.client.EventHandler;
 import com.passus.st.client.SessionStatusEvent;
 import com.passus.st.plugin.PluginConstants;
@@ -15,7 +13,8 @@
 import com.passus.st.reader.nc.NcSessionPayloadBlock;
 import com.passus.st.reader.nc.NcSessionStatusBlock;
 import java.io.IOException;
-import java.util.Map;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 /**
  *
@@ -24,6 +23,8 @@
 @Plugin(name = NcEventSource.TYPE, category = PluginConstants.CATEGORY_EVENT_SOURCE)
 public class NcEventSource implements EventSource {
 
+    private static final Logger LOGGER = LogManager.getLogger(NcEventSource.class);
+
     public static final String TYPE = "nc";
 
     private String ncFile;
@@ -38,6 +39,8 @@
 
     private String name;
 
+    private ReaderThread readerThread;
+
     @Override
     public String getType() {
         return TYPE;
@@ -100,6 +103,16 @@
             }
         }
 
+        if (readerThread != null) {
+            readerThread.working = false;
+            readerThread.interrupt();
+
+            try {
+                readerThread.join(5_000);
+            } catch (InterruptedException ignore) {
+            }
+        }
+
         started = false;
     }
 
@@ -112,26 +125,40 @@
         }
     }
 
-    private void read() throws IOException {
-        while (!reader.eof()) {
-            byte blockType = reader.peekBlockType();
-            switch (blockType) {
-                case NcSessionStatusBlock.TYPE:
-                    NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) reader.read();
-                    SessionStatusEvent event = new SessionStatusEvent(statusBlock.sessionInfo(), statusBlock.status());
-                    handler.handle(event);
-                    break;
-                case NcSessionPayloadBlock.TYPE:
-                    NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) reader.read();
-                    ByteBuff payload = readPayload(payloadBlock.data());
+    private class ReaderThread extends Thread {
 
-                    /*HttpMessage msg = httpReader.readMessage(payload);
+        private boolean working = false;
+
+        @Override
+        public void run() {
+            try {
+                working = true;
+                while (working && !reader.eof()) {
+                    byte blockType = reader.peekBlockType();
+                    switch (blockType) {
+                        case NcSessionStatusBlock.TYPE:
+                            NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) reader.read();
+                            SessionStatusEvent event = new SessionStatusEvent(statusBlock.sessionInfo(), statusBlock.status());
+                            handler.handle(event);
+                            break;
+                        case NcSessionPayloadBlock.TYPE:
+                            NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) reader.read();
+                            ByteBuff payload = readPayload(payloadBlock.data());
+
+                            /*HttpMessage msg = httpReader.readMessage(payload);
                     handler.handle(event);*/
-                    break;
-                default:
-                    reader.read();
+                            break;
+                        default:
+                            reader.read();
+                    }
+                }
+            } catch (IOException e) {
+                LOGGER.debug(e.getMessage(), e);
             }
+
+            working = false;
         }
+
     }
 
 }
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReaderTest.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReaderTest.java	Tue Nov 28 11:46:55 2017 +0100
@@ -23,7 +23,7 @@
     }
 
     @Test
-    public void testDecode_Request() throws Exception {
+    public void testDecodeMessage_Request() throws Exception {
         ByteBuff buffer = new HeapByteBuff();
         HttpRequest req = HttpRequestBuilder.get("http://test.com/test")
                 .header("X-Header", "X-Header-Value")
@@ -36,7 +36,7 @@
     }
 
     @Test
-    public void testDecode_Response() throws Exception {
+    public void testDecodeMessage_Response() throws Exception {
         ByteBuff buffer = new HeapByteBuff();
         HttpResponse resp = HttpResponseBuilder.ok()
                 .header("X-Header", "X-Header-Value")
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java	Tue Nov 28 11:46:55 2017 +0100
@@ -10,12 +10,7 @@
 import java.io.IOException;
 import java.util.UUID;
 import org.apache.commons.io.FileUtils;
-import static org.mockito.Mockito.mock;
 import static org.testng.Assert.*;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 /**
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java	Tue Nov 28 11:46:55 2017 +0100
@@ -70,8 +70,8 @@
                 assertEquals(NcSessionPayloadBlock.TYPE, block.type());
                 NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) block;
                 assertSame(sessionInfoBlock.sessionInfo(), payloadBlock.sessionInfo());
-                assertEquals(data.length, payloadBlock.dataLength());
-                assertEquals(0, payloadBlock.optionsLength());
+                assertEquals(data.length, payloadBlock.dataSize());
+                assertEquals(0, payloadBlock.optionsSize());
                 assertTrue(payloadBlock.data() instanceof ByteBuff);
                 ByteBuff payload = (ByteBuff) payloadBlock.data();
                 assertEquals(data, payload.toArray());
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataHelperTest.java	Tue Nov 28 11:08:29 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataHelperTest.java	Tue Nov 28 11:46:55 2017 +0100
@@ -73,7 +73,7 @@
     @Test(dataProvider = "validIntVLC")
     public void testWriteIntVLC(byte[] expectedResult, int expectedBytes, int length) {
         byte[] result = new byte[expectedResult.length];
-        helper.writeIntVLC(length, result);
+        helper.writeIntVLC(result, length);
         assertEquals(expectedResult, result);
     }
 
@@ -102,7 +102,7 @@
     @Test(dataProvider = "validLongVLC")
     public void testWriteLongVLC(byte[] expectedResult, int expectedBytes, long length) {
         byte[] result = new byte[expectedResult.length];
-        helper.writeLongVLC(length, result);
+        helper.writeLongVLC(result, length);
         assertEquals(expectedResult, result);
     }
 
@@ -110,19 +110,19 @@
     public void testWriteStringNullTerminated() {
         ByteBuffer buffer = ByteBuffer.allocate(256);
 
-        helper.writeStringNullTerminated("", buffer);
+        helper.writeStringNullTerminated(buffer, "");
         buffer.flip();
         assertEquals(1, buffer.remaining());
         assertEquals(AsciiUtils.NUL, buffer.get());
         buffer.clear();
 
-        helper.writeStringNullTerminated(null, buffer);
+        helper.writeStringNullTerminated(buffer, null);
         buffer.flip();
         assertEquals(1, buffer.remaining());
         assertEquals(AsciiUtils.NUL, buffer.get());
         buffer.clear();
         
-        helper.writeStringNullTerminated("123", buffer);
+        helper.writeStringNullTerminated(buffer, "123");
         buffer.flip();
         assertEquals(4, buffer.remaining());
         assertEquals(0x31, buffer.get());