changeset 672:7bc7cfb26ddf

NC file in progress
author Devel 2
date Mon, 20 Nov 2017 14:15:18 +0100
parents a38680f29c27
children 87bb120e72d7
files stress-tester/src/main/java/com/passus/st/reader/DataWriterImpl.java stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpDataReader.java stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpDataUtils.java stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java
diffstat 10 files changed, 516 insertions(+), 157 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/DataWriterImpl.java	Mon Nov 20 10:23:34 2017 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,61 +0,0 @@
-package com.passus.st.reader;
-
-import com.passus.net.IpAddress;
-
-public class DataWriterImpl implements DataWriter {
-
-    @Override
-    public void writeLength(long length) {
-        
-    }
-
-    public void writeBytes(byte[] value) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public void writeByte(byte value) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public void writeInt2(int value) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public void writeInt3(int value) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public void writeInt4(int value) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public void writeLong4(long value) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public void writeLong8(long value) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public void writeString(String value) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public void writeStringNullTerminated(String value) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public void writeIpAddress(IpAddress ipAddress) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java	Mon Nov 20 14:15:18 2017 +0100
@@ -0,0 +1,16 @@
+package com.passus.st.reader.nc;
+
+import com.passus.net.http.HttpMessage;
+import java.io.IOException;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class HttpSessionPayloadEventDataReader {
+
+    private HttpMessage read(NcDataBlockReader reader) throws IOException {
+        return null;
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java	Mon Nov 20 14:15:18 2017 +0100
@@ -0,0 +1,114 @@
+package com.passus.st.reader.nc;
+
+import com.passus.data.ByteBuff;
+import com.passus.data.DataHelper;
+import com.passus.data.DataSource;
+import com.passus.data.HeapByteBuff;
+import com.passus.net.http.HttpConsts;
+import com.passus.net.http.HttpHeaderEntry;
+import com.passus.net.http.HttpHeaders;
+import com.passus.net.http.HttpMessage;
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpResponse;
+import com.passus.st.client.http.HttpSessionPayloadEvent;
+import com.passus.st.emitter.SessionInfo;
+import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_REQUEST;
+import java.io.IOException;
+import java.nio.ByteOrder;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class HttpSessionPayloadEventDataWriter {
+
+    private final NcDataHelper ncDataHelper = NcDataHelper.getInstance();
+
+    private final DataHelper dataHelper = DataHelper.get(ByteOrder.BIG_ENDIAN);
+
+    private int encodeHeaders(HttpMessage msg, ByteBuff buff) {
+        HttpHeaders headers = msg.getHeaders();
+        ByteBuff headersBuff = new HeapByteBuff();
+        for (HttpHeaderEntry entry : headers.getEntries()) {
+            int code = NcHttpDataUtils.headerToCode(entry.getName());
+            if (code == -1) {
+                headersBuff.append(NcHttpDataUtils.CUSTOM_HEADER_CODE);
+                ncDataHelper.writeByteStringNullTerminated(entry.getName(), headersBuff);
+                ncDataHelper.writeByteStringNullTerminated(entry.getValue(), headersBuff);
+            } else {
+                headersBuff.append((byte) code);
+                ncDataHelper.writeByteStringNullTerminated(entry.getValue(), headersBuff);
+            }
+        }
+
+        int len = headersBuff.readableBytes();
+        len += ncDataHelper.writeLongVLC(headersBuff.readableBytes(), buff);
+        buff.append(headersBuff);
+        return len;
+    }
+
+    private void writeVersion(HttpMessage msg, ByteBuff buff) throws IOException {
+        if (HttpConsts.VERSION_1_0.equals(msg.getVersion())) {
+            buff.append(NcHttpDataUtils.VERSION_1_0);
+        } else if (HttpConsts.VERSION_1_1.equals(msg.getVersion())) {
+            buff.append(NcHttpDataUtils.VERSION_1_1);
+        } else {
+            throw new IOException("Not supported HTTP version '" + msg.getVersion() + "'.");
+        }
+    }
+
+    private void writeMessage(long timestamp, SessionInfo session, HttpMessage msg, NcDataBlockWriter writer) throws IOException {
+        if (msg != null) {
+            long len = 0;
+            DataSource content = msg.getContent();
+            ByteBuff buff = new HeapByteBuff();
+            byte flags = 0;
+            if (msg.isRequest()) {
+                flags = FLAG_REQUEST;
+                HttpRequest req = (HttpRequest) msg;
+
+                buff.append(flags);
+                len++;
+                len += ncDataHelper.writeByteStringNullTerminated(req.getMethod().toByteString(), buff);
+                len += ncDataHelper.writeByteStringNullTerminated(req.getUri(), buff);
+
+                writeVersion(msg, buff);
+                len++;
+            } else {
+                HttpResponse resp = (HttpResponse) msg;
+
+                buff.append(flags);
+                len++;
+                dataHelper.writeInt2(buff, resp.getStatus().getCode());
+                len += 2;
+                len += ncDataHelper.writeByteStringNullTerminated(resp.getStatus().getReasonPhrase(), buff);
+
+                writeVersion(msg, buff);
+                len++;
+            }
+
+            buff.append(flags);
+            len += encodeHeaders(msg, buff);
+
+            if (content != null) {
+                len += content.available();
+            }
+
+            writer.writeSessionPayloadHeader(timestamp, session, (byte) 1, len);
+            writer.writeSessionPayloadContent(buff);
+
+            if (content != null) {
+                writer.writeSessionPayloadContent(content);
+            }
+        }
+    }
+
+    public void write(HttpSessionPayloadEvent event, NcDataBlockWriter writer) throws IOException {
+        long time = event.getTimestamp();
+        SessionInfo session = event.getSessionInfo();
+
+        writeMessage(time, session, event.getRequest(), writer);
+        writeMessage(time, session, event.getResponse(), writer);
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java	Mon Nov 20 10:23:34 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java	Mon Nov 20 14:15:18 2017 +0100
@@ -1,6 +1,7 @@
 package com.passus.st.reader.nc;
 
 import com.passus.commons.Assert;
+import com.passus.data.ByteBuff;
 import com.passus.data.DataHelper;
 import com.passus.st.reader.DataBlockReader;
 import com.passus.st.reader.nc.block.NcDataBlock;
@@ -34,7 +35,9 @@
 
     private int bufferSize = DEFAULT_BUFFER_SIZE;
 
-    private ByteBuffer buffer;
+    private ByteBuffer nioBuffer;
+
+    private ByteBuff buffer;
 
     private ByteBuffer[] buffers;
 
@@ -67,9 +70,9 @@
 
         try {
             ch = FileChannel.open(path, StandardOpenOption.READ);
-            buffer = ByteBuffer.allocate(bufferSize);
+            nioBuffer = ByteBuffer.allocate(bufferSize);
             buffers = new ByteBuffer[1];
-            buffers[0] = buffer;
+            buffers[0] = nioBuffer;
             opened = true;
 
             readHeader();
@@ -90,27 +93,58 @@
         }
 
         ch = null;
-        buffer = null;
+        nioBuffer = null;
         buffers = null;
         opened = false;
     }
 
+    private void checkOpened() {
+        if (!opened) {
+            throw new IllegalStateException("Reader is not opened.");
+        }
+    }
+
+    private void skip(int length) throws IOException {
+        ch.position(ch.position() + length);
+    }
+
+    private void fill(int requiredBytes) throws IOException {
+        while (buffer.readableBytes() >= requiredBytes) {
+            int res = ch.read(nioBuffer);
+            if (res == -1) {
+                break;
+            }
+            nioBuffer.flip();
+
+            buffer.append(buffer);
+        }
+    }
+
     @Override
     public void reset() throws IOException {
         header = null;
 
     }
 
+    //public 
     private void readBytes(int offset, int length) throws IOException {
-        buffer.clear();
+        nioBuffer.clear();
         long readed = ch.read(buffers, offset, length);
     }
 
+    private void readSessionInfoBlock() {
+
+    }
+
+    private void readBlock() {
+
+    }
+
     private void readHeader() throws IOException {
         readBytes(0, bufferSize);
 
-        byte[] data = buffer.array();
-        int offset = buffer.arrayOffset();
+        byte[] data = nioBuffer.array();
+        int offset = nioBuffer.arrayOffset();
 
         if (!NcHeader.isPreambule(data, offset)) {
             throw new IOException("Invalid preambule.");
@@ -124,16 +158,14 @@
         }
 
         header = new NcHeader(verMajor, varMinor);
-        buffer.position(NcHeader.PREAMBULE.length - 1);
+        nioBuffer.position(NcHeader.PREAMBULE.length - 1);
     }
 
     @Override
     public NcDataBlock read() throws IOException {
-        if (!opened) {
-            throw new IOException("Reader is not opened.");
-        }
+        checkOpened();
 
-        throw new UnsupportedOperationException("Not supported yet.");
+        return null;
     }
 
 }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Mon Nov 20 10:23:34 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Mon Nov 20 14:15:18 2017 +0100
@@ -2,6 +2,7 @@
 
 import com.passus.commons.Assert;
 import com.passus.data.ByteBuff;
+import com.passus.data.ByteBuffDataSource;
 import com.passus.data.DataSource;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.reader.DataBlockWriter;
@@ -15,8 +16,6 @@
 import java.nio.file.StandardOpenOption;
 import java.util.HashMap;
 import java.util.Map;
-import static com.passus.st.reader.nc.NcDataUtilsTmp.writeStringNullTerminated;
-import static com.passus.st.reader.nc.NcDataUtilsTmp.writeIpAddress;
 
 /**
  *
@@ -28,6 +27,8 @@
 
     private static final int DEFAULT_BYTE_BUFFER = 64 * 1024;
 
+    private NcDataHelper dataHelper = NcDataHelper.getInstance();
+
     private final Path path;
 
     private FileChannel ch;
@@ -68,6 +69,12 @@
         this.maxSegmentBlocks = maxSegmentBlocks;
     }
 
+    private void write(ByteBuffer buff) throws IOException {
+        buff.flip();
+        ch.write(buff);
+        buff.clear();
+    }
+
     private void write(ByteBuffer[] buffers) throws IOException {
         for (ByteBuffer buff : buffers) {
             buff.flip();
@@ -79,9 +86,7 @@
     }
 
     private void writeBuffer() throws IOException {
-        buffer.flip();
-        ch.write(buffer);
-        buffer.clear();
+        write(buffer);
     }
 
     public void flush() throws IOException {
@@ -137,8 +142,8 @@
     private int writeHeader() throws IOException {
         buffer.clear();
         buffer.put(NcHeader.PREAMBULE);
-        buffer.put(NcDataUtils.VERSION_MAJOR);
-        buffer.put(NcDataUtils.VERSION_MINOR);
+        buffer.put(NcDataHelper.VERSION_MAJOR);
+        buffer.put(NcDataHelper.VERSION_MINOR);
         buffer.putLong(0L);
         writeBuffer();
         return NcHeader.SIZE;
@@ -158,11 +163,11 @@
         }
 
         buffer.putInt(sessionId);
-        writeStringNullTerminated(buffer, sessionInfo.getSourceName());
+        dataHelper.writeStringNullTerminated(sessionInfo.getSourceName(), buffer);
         buffer.put((byte) sessionInfo.getTransport());
-        writeIpAddress(buffer, sessionInfo.getSrcIp());
+        dataHelper.writeIpAddress(sessionInfo.getSrcIp(), buffer);
         buffer.putShort((short) sessionInfo.getSrcPort());
-        writeIpAddress(buffer, sessionInfo.getDstIp());
+        dataHelper.writeIpAddress(sessionInfo.getDstIp(), buffer);
         buffer.putShort((short) sessionInfo.getDstPort());
         return sessionId;
     }
@@ -174,53 +179,60 @@
         buffer.putLong(timestamp);
         buffer.putInt(sessionId);
         buffer.put(status);
-        
+
         writeBuffer();
     }
 
+    public void writeSessionPayloadHeader(long timestamp, SessionInfo session, byte proto, long dataLength) throws IOException {
+        checkOpened();
+
+        int sessionId = getSessionId(session, opened);
+        buffer.clear();
+        buffer.putLong(timestamp);
+        buffer.putInt(sessionId);
+        buffer.put(proto);
+        dataHelper.writeLongVLC(dataLength, buffer);
+
+        writeBuffer();
+    }
+
+    private ByteBuffer wrapData(Object data) throws IOException {
+        ByteBuffer dataBuffer;
+        if (data == null) {
+            return ByteBuffer.allocate(0);
+        } else if (data instanceof byte[]) {
+            byte[] arrData = (byte[]) data;
+            return ByteBuffer.wrap(arrData);
+        } else if (data instanceof ByteBuffDataSource) {
+            ByteBuffDataSource dsData = (ByteBuffDataSource) data;
+            return dsData.getByteBuffer().toNioByteBuffer(false);
+        } else if (data instanceof DataSource) {
+            DataSource dsData = (DataSource) data;
+            int length = dsData.available();
+            dataBuffer = ByteBuffer.allocate(length);
+            dsData.write(dataBuffer.array(), 0, length);
+            dataBuffer.limit(length);
+            return dataBuffer;
+        } else if (data instanceof ByteBuff) {
+            ByteBuff bbData = (ByteBuff) data;
+            return bbData.toNioByteBuffer(false);
+        } else {
+            throw new IllegalArgumentException("Not supported data class '" + data.getClass() + "'.");
+        }
+    }
+
+    public void writeSessionPayloadContent(Object data) throws IOException {
+        checkOpened();
+        ByteBuffer dataBuffer = wrapData(data);
+        write(dataBuffer);
+    }
+
     public void writeSessionPayload(long timestamp, SessionInfo session, byte proto, Object data) throws IOException {
         checkOpened();
-        writeSessionPayload(timestamp, session, proto, data, (byte) 0);
-    }
-
-    public void writeSessionPayload(long timestamp, SessionInfo session, byte proto, Object data, byte flags) throws IOException {
-        checkOpened();
-
-        int sessionId = sessionIdMap.get(session);
-
-        buffer.clear();
-        buffer.putLong(timestamp);
-        buffer.putInt(sessionId);
-        buffer.put(flags);
-        buffer.put(proto);
 
-        ByteBuffer dataBuffer;
-        if (data == null) {
-            buffer.putInt(0);
-            dataBuffer = null;
-        } else if (data instanceof byte[]) {
-            byte[] arrData = (byte[]) data;
-            buffer.putInt(arrData.length); //@TODO Length - zmien na format docelowy
-            dataBuffer = ByteBuffer.wrap(arrData);
-        } else if (data instanceof DataSource) {
-            DataSource dsData = (DataSource) data;
-            int length = dsData.available();
-            buffer.putInt(length);
-            dataBuffer = ByteBuffer.allocate(length);
-            dsData.write(dataBuffer.array(), 0, length);
-        } else if (data instanceof ByteBuff) {
-            ByteBuff bbData = (ByteBuff) data;
-            dataBuffer = bbData.toNioByteBuffer(false);
-            buffer.putInt(bbData.readableBytes());
-        } else {
-            throw new IllegalArgumentException("Not supported data class '" + data.getClass() + "'.");
-        }
-
-        if (dataBuffer == null) {
-            write(new ByteBuffer[]{buffer, dataBuffer});
-        } else {
-            writeBuffer();
-        }
+        ByteBuffer dataBuffer = wrapData(data);
+        writeSessionPayloadHeader(timestamp, session, proto, dataBuffer.remaining());
+        write(dataBuffer);
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java	Mon Nov 20 10:23:34 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java	Mon Nov 20 14:15:18 2017 +0100
@@ -33,7 +33,7 @@
 
     private static final ThreadLocal<NcDataHelper> THREAD_LOCAL = new ThreadLocal<NcDataHelper>() {
         @Override
-        public NcDataHelper get() {
+        protected NcDataHelper initialValue() {
             return new NcDataHelper();
         }
 
@@ -63,22 +63,6 @@
         return buffer.toByteString(startIndex, endIndex);
     }
 
-    public int writeLongVLC(ByteBuff buffer, long value) {
-        if (value < VLC_LONG_TRESHOLD) {
-            buffer.append((byte) (value >> 8));
-            buffer.append((byte) value);
-            return 2;
-        } else {
-            int bytes = DataUtils.countBytes(value);
-            byte[] arr = new byte[bytes];
-            DataUtils.putLong(arr, 0, bytes, value);
-            buffer.append((byte) (bytes >> 8));
-            buffer.append((byte) bytes);
-            buffer.append(arr);
-            return bytes + 2;
-        }
-    }
-
     public int readIntVLC(ByteBuff buffer) {
         int len = getIntVLC(buffer.buffer(), buffer.startIndex(), buffer.endIndex(), mutableInt);
         buffer.skipBytes(len);
@@ -112,8 +96,8 @@
     }
 
     public int writeIntVLC(int value, ByteBuff buffer) {
-        int len = writeIntVLC(value, buffer.buffer(), buffer.startIndex(), buffer.endIndex());
-        buffer.skipBytes(len);
+        int len = writeIntVLC(value, buffer.buffer(), buffer.startIndex(), buffer.capacity());
+        buffer.endIndex(buffer.endIndex() + len);
         return len;
     }
 
@@ -137,7 +121,7 @@
             return 1;
         } else {
             int bytes = DataUtils.countBytes(value);
-            if (bytes > (endIndex - startIndex + 1)) {
+            if (bytes + 1 > (endIndex - startIndex)) {
                 throw new IndexOutOfBoundsException();
             }
 
@@ -182,30 +166,108 @@
         return readed;
     }
 
+    public int writeLongVLC(long value, ByteBuff buffer) {
+        int len = writeLongVLC(value, buffer.buffer(), buffer.startIndex(), buffer.capacity());
+        buffer.skipBytes(len);
+        return len;
+    }
+
     public int writeLongVLC(long value, ByteBuffer buffer) {
+        int len = writeLongVLC(value, buffer.array(), buffer.position(), buffer.limit());
+        buffer.position(buffer.position() + len);
+        return len;
+    }
+
+    public int writeLongVLC(long value, byte[] data) {
+        return writeLongVLC(value, data, 0, data.length);
+    }
+
+    public int writeLongVLC(long value, byte[] data, int startIndex) {
+        return writeLongVLC(value, data, startIndex, data.length);
+    }
+
+    public int writeLongVLC(long value, byte[] data, int startIndex, int endIndex) {
         if (value < VLC_LONG_TRESHOLD) {
-            buffer.put((byte) (value >> 8));
-            buffer.put((byte) value);
+            if (endIndex - startIndex < 2) {
+                throw new IndexOutOfBoundsException();
+            }
+
+            data[startIndex++] = (byte) (value >> 8);
+            data[startIndex++] = (byte) value;
             return 2;
         } else {
             int bytes = DataUtils.countBytes(value);
-            byte[] arr = new byte[bytes];
-            DataUtils.putLong(arr, 0, bytes, value);
-            buffer.put((byte) (bytes >> 8));
-            buffer.put((byte) bytes);
-            buffer.put(arr);
+            if (endIndex - startIndex < bytes + 2) {
+                throw new IndexOutOfBoundsException();
+            }
+            data[startIndex++] = (byte) (value >> 8);
+            data[startIndex++] = (byte) value;
+
+            DataUtils.putLong(data, startIndex, bytes, value);
             return bytes + 2;
         }
     }
 
-    public int writeStringNullTerminated(ByteBuffer buffer, String value) {
-        byte[] data = value.getBytes(CHARSET);
-        buffer.put(data);
-        buffer.put(AsciiUtils.NUL);
-        return data.length + 1;
+    public int writeByteStringNullTerminated(ByteString value, ByteBuff buffer) {
+        return writeByteStringNullTerminated(value, buffer, true);
     }
 
-    public int writeIpAddress(ByteBuffer buffer, IpAddress ip) {
+    public int writeByteStringNullTerminated(ByteString value, ByteBuff buffer, boolean sanitize) {
+        if (buffer.writableBytes() < value.length()) {
+            buffer.ensureCapacity(value.length());
+        }
+
+        int len = writeByteStringNullTerminated(value, buffer.buffer(), buffer.startIndex(), buffer.capacity(), sanitize);
+        buffer.endIndex(buffer.endIndex() + len);
+        return len;
+    }
+
+    public int writeByteStringNullTerminated(ByteString value, ByteBuffer buffer) {
+        return writeByteStringNullTerminated(value, buffer, true);
+    }
+
+    public int writeByteStringNullTerminated(ByteString value, ByteBuffer buffer, boolean sanitize) {
+        int len = writeByteStringNullTerminated(value, buffer.array(), buffer.position(), buffer.limit(), sanitize);
+        buffer.limit(buffer.limit() + len);
+        return len;
+    }
+
+    public int writeByteStringNullTerminated(ByteString value, byte data[], int startIndex, int endIndex, boolean sanitize) {
+        int len = 0;
+        byte[] valData = value.getBytes();
+        if (!value.isEmpty()) {
+            if (sanitize) {
+                for (int i = 0; i < valData.length; i++) {
+                    byte b = valData[i];
+                    if (b != AsciiUtils.NUL) {
+                        len++;
+                        data[startIndex++] = valData[i];
+                    }
+                }
+            } else {
+                len = valData.length;
+                System.arraycopy(valData, 0, data, startIndex, len);
+                startIndex += len;
+            }
+        }
+
+        data[startIndex] = AsciiUtils.NUL;
+        return len + 1;
+    }
+
+    public int writeStringNullTerminated(String value, ByteBuffer buffer) {
+        if (value != null && !value.isEmpty()) {
+            byte[] data = value.getBytes(CHARSET);
+            buffer.put(data);
+            buffer.put(AsciiUtils.NUL);
+            return data.length + 1;
+        }
+
+        buffer.put(AsciiUtils.NUL);
+        return 1;
+    }
+
+    public int writeIpAddress(IpAddress ip, ByteBuffer buffer) {
         byte[] data = ip.getAddress();
         buffer.put((byte) ip.getVersion());
         buffer.put(data);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpDataReader.java	Mon Nov 20 14:15:18 2017 +0100
@@ -0,0 +1,19 @@
+package com.passus.st.reader.nc;
+
+import com.passus.net.http.HttpMessage;
+import com.passus.st.reader.DataBlockReader;
+import com.passus.st.reader.nc.block.NcDataBlock;
+import java.io.IOException;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class NcHttpDataReader implements NcDataReader<HttpMessage> {
+
+    @Override
+    public HttpMessage read(DataBlockReader<NcDataBlock> reader) throws IOException {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+    
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpDataUtils.java	Mon Nov 20 14:15:18 2017 +0100
@@ -0,0 +1,84 @@
+package com.passus.st.reader.nc;
+
+import com.passus.data.ByteString;
+import com.passus.net.http.HttpHeaders;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class NcHttpDataUtils {
+
+    public static final byte FLAG_REQUEST = (byte) 0b10000000;
+
+    public static final byte FLAG_ENCODED = (byte) 0b01000000;
+
+    public static final byte FLAG_COMPRESSED = (byte) 0b00100000;
+
+    public static final byte VERSION_1_0 = 1;
+    
+    public static final byte VERSION_1_1 = 2;
+    
+    public static final int CUSTOM_HEADER_CODE = 0;
+
+    private static final Map<Byte, ByteString> CODE_TO_HEADER;
+
+    private static final Map<ByteString, Byte> HEADER_TO_CODE;
+
+    static {
+        Map<Byte, ByteString> codeToHeader = new HashMap<>();
+        codeToHeader.put((byte) 1, HttpHeaders.ACCEPT);
+        codeToHeader.put((byte) 2, HttpHeaders.ACCEPT_CHARSET);
+        codeToHeader.put((byte) 3, HttpHeaders.ACCEPT_ENCODING);
+        codeToHeader.put((byte) 4, HttpHeaders.ACCEPT_LANGUAGE);
+        codeToHeader.put((byte) 5, HttpHeaders.AUTHORIZATION);
+        codeToHeader.put((byte) 6, HttpHeaders.CONTENT_ENCODING);
+        codeToHeader.put((byte) 7, HttpHeaders.CONTENT_LENGTH);
+        codeToHeader.put((byte) 8, HttpHeaders.CONTENT_TYPE);
+        codeToHeader.put((byte) 9, HttpHeaders.CONTENT_DISPOSITION);
+        codeToHeader.put((byte) 10, HttpHeaders.CONTENT_LANGUAGE);
+        codeToHeader.put((byte) 11, HttpHeaders.COOKIE);
+        codeToHeader.put((byte) 12, HttpHeaders.COOKIE2);
+        codeToHeader.put((byte) 13, HttpHeaders.DATE);
+        codeToHeader.put((byte) 14, HttpHeaders.EXPIRES);
+        codeToHeader.put((byte) 15, HttpHeaders.ETAG);
+        codeToHeader.put((byte) 16, HttpHeaders.HOST);
+        codeToHeader.put((byte) 17, HttpHeaders.KEEP_ALIVE);
+        codeToHeader.put((byte) 18, HttpHeaders.LOCATION);
+        codeToHeader.put((byte) 19, HttpHeaders.ORIGIN);
+        codeToHeader.put((byte) 20, HttpHeaders.PROXY_AUTHENTICATE);
+        codeToHeader.put((byte) 21, HttpHeaders.PROXY_AUTHORIZATION);
+        codeToHeader.put((byte) 22, HttpHeaders.REFERER);
+        codeToHeader.put((byte) 23, HttpHeaders.SERVER);
+        codeToHeader.put((byte) 24, HttpHeaders.SET_COOKIE);
+        codeToHeader.put((byte) 25, HttpHeaders.SET_COOKIE2);
+        codeToHeader.put((byte) 26, HttpHeaders.TRANSFER_ENCODING);
+        codeToHeader.put((byte) 27, HttpHeaders.USER_AGENT);
+        codeToHeader.put((byte) 28, HttpHeaders.VARY);
+        codeToHeader.put((byte) 29, HttpHeaders.VIA);
+        codeToHeader.put((byte) 30, HttpHeaders.WWW_AUTHENTICATE);
+
+        Map<ByteString, Byte> headerToCode = new HashMap<>(codeToHeader.size());
+        codeToHeader.forEach((code, header) -> headerToCode.put(header, code));
+
+        CODE_TO_HEADER = Collections.unmodifiableMap(codeToHeader);
+        HEADER_TO_CODE = Collections.unmodifiableMap(headerToCode);
+    }
+
+    public static byte headerToCode(CharSequence name) {
+        ByteString bs = ByteString.create(name);
+        Byte code = HEADER_TO_CODE.get(bs);
+        if (code == null) {
+            return -1;
+        }
+
+        return code;
+    }
+
+    public static ByteString codeToHeader(byte code) {
+        return CODE_TO_HEADER.get(code);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java	Mon Nov 20 14:15:18 2017 +0100
@@ -0,0 +1,62 @@
+package com.passus.st.reader.nc;
+
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpRequestBuilder;
+import com.passus.st.client.http.HttpSessionPayloadEvent;
+import com.passus.st.emitter.SessionInfo;
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.*;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class HttpSessionPayloadEventDataWriterTest {
+
+    private final SessionInfo session;
+
+    public HttpSessionPayloadEventDataWriterTest() throws Exception {
+        session = new SessionInfo("1.1.1.1:5000", "2.2.2.2:80");
+    }
+
+    private File createTmpFile() throws IOException {
+        File tmpDir = new File(System.getProperty("java.io.tmpdir"), "st");
+        if (!tmpDir.exists()) {
+            tmpDir.mkdir();
+        }
+
+        return new File(tmpDir, "st_" + UUID.randomUUID().toString() + ".tmp");
+    }
+
+    @Test
+    public void testWrite_HttpRequest() throws Exception {
+        File file = createTmpFile();
+        try {
+            try (NcDataBlockWriter writer = new NcDataBlockWriter(file)) {
+                writer.open();
+
+                HttpSessionPayloadEventDataWriter payloadWriter = new HttpSessionPayloadEventDataWriter();
+                HttpRequest req = HttpRequestBuilder.get("http://test.com/test")
+                        .header("X-Header", "X-Header-Value")
+                        .content("content")
+                        .build();
+
+                payloadWriter.write(new HttpSessionPayloadEvent(session, req, null, ""), writer);
+            }
+
+            byte[] content = FileUtils.readFileToByteArray(file);
+        } finally {
+            file.delete();
+        }
+    }
+
+}
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java	Mon Nov 20 10:23:34 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java	Mon Nov 20 14:15:18 2017 +0100
@@ -1,5 +1,6 @@
 package com.passus.st.reader.nc;
 
+import com.passus.st.emitter.SessionInfo;
 import java.io.File;
 import java.io.IOException;
 import java.util.UUID;
@@ -38,4 +39,22 @@
         }
     }
 
+    @Test
+    public void testWrite_WritePayload() throws Exception {
+        File tmpFile = createTmpFile();
+
+        long time = Long.MAX_VALUE;
+        SessionInfo session = new SessionInfo("1.1.1.1:5000", "2.2.2.2:80");
+        try {
+            try (NcDataBlockWriter writer = new NcDataBlockWriter(tmpFile)) {
+                writer.open();
+                writer.writeSessionPayload(time, session, (byte) 100, null);
+            }
+
+            byte[] content = FileUtils.readFileToByteArray(tmpFile);
+            assertTrue(NcHeader.isPreambule(content, 0));
+        } finally {
+            tmpFile.delete();
+        }
+    }
 }