changeset 682:e420a96bed43

NC file in progress
author Devel 2
date Thu, 23 Nov 2017 13:21:01 +0100
parents d48d77c2270b
children 4440624b0c6d
files stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java stress-tester/src/main/java/com/passus/st/reader/DataReader.java stress-tester/src/main/java/com/passus/st/reader/DataWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java stress-tester/src/main/java/com/passus/st/reader/nc/ReadonlyFileChannelDataSource.java stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionBlock.java stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java stress-tester/src/test/java/com/passus/st/reader/nc/NcDataHelperTest.java
diffstat 10 files changed, 295 insertions(+), 112 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Wed Nov 22 15:40:56 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Thu Nov 23 13:21:01 2017 +0100
@@ -32,7 +32,7 @@
 
     private final int hashCode;
 
-    private String sourceName;
+    private String sourceName = "";
 
     public SessionInfo(String srcSocket, String dstSocket) throws ParseException {
         this(new SocketAddress(srcSocket), new SocketAddress(dstSocket));
--- a/stress-tester/src/main/java/com/passus/st/reader/DataReader.java	Wed Nov 22 15:40:56 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/DataReader.java	Thu Nov 23 13:21:01 2017 +0100
@@ -1,45 +1,13 @@
 package com.passus.st.reader;
 
-import com.passus.net.IpAddress;
+import java.io.IOException;
 
 /**
  *
  * @author Mirosław Hawrot
  */
-public interface DataReader {
-
-    public long size();
-    
-    public long remains();
-    
-    public int position();
-
-    public void position(long pos);
-
-    public void skip(int length);
-    
-    public int read();
-    
-    public long readLength();
+public interface DataReader<T, K> {
 
-    public byte readByte();
-
-    public byte[] readBytes(int length);
-
-    public int readInt2();
-
-    public int readInt3();
-
-    public int readInt4();
-
-    public long readLong4();
-
-    public long readLong8();
-
-    public String readString();
-
-    public String readStringNullTerminated();
-
-    public IpAddress readIpAddress();
+    public T read(DataBlockReader<K> reader) throws IOException;
 
 }
--- a/stress-tester/src/main/java/com/passus/st/reader/DataWriter.java	Wed Nov 22 15:40:56 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/DataWriter.java	Thu Nov 23 13:21:01 2017 +0100
@@ -1,33 +1,13 @@
 package com.passus.st.reader;
 
-import com.passus.net.IpAddress;
+import java.io.IOException;
 
 /**
  *
  * @author Mirosław Hawrot
  */
-public interface DataWriter {
-
-    public void writeLength(long length);
-
-    public void writeByte(byte value);
-
-    public void writeBytes(byte[] value);
-
-    public void writeInt2(int value);
+public interface DataWriter<T, K> {
 
-    public void writeInt3(int value);
-
-    public void writeInt4(int value);
-
-    public void writeLong4(long value);
-
-    public void writeLong8(long value);
-
-    public void writeString(String value);
-
-    public void writeStringNullTerminated(String value);
-
-    public void writeIpAddress(IpAddress ipAddress);
+    public void write(T object, DataBlockWriter<K> writer) throws IOException;
 
 }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java	Wed Nov 22 15:40:56 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java	Thu Nov 23 13:21:01 2017 +0100
@@ -3,6 +3,7 @@
 import com.passus.commons.Assert;
 import com.passus.data.ByteBuff;
 import com.passus.data.DataHelper;
+import com.passus.data.HeapByteBuff;
 import com.passus.net.IpAddress;
 import com.passus.net.session.SessionBean;
 import com.passus.st.emitter.SessionInfo;
@@ -87,6 +88,8 @@
 
         try {
             ch = FileChannel.open(path, StandardOpenOption.READ);
+            System.out.println("size: " + ch.size());
+            buffer = new HeapByteBuff(bufferSize);
             nioBuffer = ByteBuffer.allocate(bufferSize);
             nioBuffers = new ByteBuffer[1];
             nioBuffers[0] = nioBuffer;
@@ -125,41 +128,52 @@
         ch.position(ch.position() + length);
     }
 
-    private void read(int requiredBytes) throws IOException {
+    private int read(int requiredBytes) throws IOException {
         if (eof) {
-            return;
+            return -1;
         }
 
-        if (buffer.readableBytes() <= requiredBytes) {
-            return;
+        if (buffer.readableBytes() >= requiredBytes) {
+            return -1;
         }
 
         if (buffer.isEmpty() && buffer.readedBytes() > 0) {
             buffer.clear();
         }
 
-        while (buffer.readableBytes() >= requiredBytes) {
+        int readed = 0;
+        while (buffer.readableBytes() < requiredBytes) {
             int res = ch.read(nioBuffer);
             if (res == -1) {
                 eof = true;
-                break;
+                return -1;
             }
+            
             nioBuffer.flip();
+            readed += nioBuffer.remaining();
             buffer.append(nioBuffer);
-            buffer.clear();
+            nioBuffer.clear();
         }
+
+        return readed;
     }
 
     @Override
     public void reset() throws IOException {
         header = null;
+        ch.position(NcHeader.SIZE);
+        buffer.clear();
+        nioBuffer.clear();
+    }
 
+    public long position() throws IOException {
+        return ch.position() - buffer.readedBytes();
     }
 
     private void readHeader() throws IOException {
         read(NcHeader.SIZE);
 
-        if (!NcHeader.isPreambule(buffer.buffer(), nioBuffer.arrayOffset())) {
+        if (!NcHeader.isPreambule(buffer.buffer(), buffer.startIndex())) {
             throw new IOException("Invalid preambule.");
         }
         buffer.skipBytes(NcHeader.PREAMBULE.length);
@@ -192,7 +206,7 @@
 
         long totalSize = dataHelper.readLong8(buffer);
         long prevTotalSize = dataHelper.readLong8(buffer);
-        int blocksNum = dataHelper.readInt4(buffer);
+        int blocksNum = dataHelper.readInt2(buffer);
         return new NcSegmentBlock(totalSize, prevTotalSize, blocksNum);
     }
 
@@ -217,7 +231,7 @@
         IpAddress serverIp = ncDataHelper.readIpAddress(buffer);
         int serverPort = dataHelper.readInt2(buffer);
 
-        SessionInfo sessionInfo = new SessionInfo(serverIp, serverPort, clientIp, clientPort, serverPort);
+        SessionInfo sessionInfo = new SessionInfo(clientIp, clientPort, serverIp, serverPort, transport);
         sessionInfo.setSourceName(sourceName);
         if (sessionIdMap.put(sessionId, sessionInfo) != null) {
             throw new IOException("Multiple session info block for sessionId '" + sessionId + "'.");
@@ -245,7 +259,7 @@
         return new NcSessionStatusBlock(timestamp, sessionId, session, status);
     }
 
-    private NcSessionPayloadBlock readSessionPayloadBlock(boolean skipBlockType) throws IOException {
+    private NcSessionPayloadBlock readSessionPayloadBlockHeader(boolean skipBlockType) throws IOException {
         read(NcSessionPayloadBlock.MAX_HEADER_SIZE);
 
         if (!skipBlockType) {
@@ -259,47 +273,92 @@
         if (session == null) {
             throw new IOException("Invalid session id '" + sessionId + "'.");
         }
-        long totalSize = dataHelper.readLong8(buffer);
+        //long totalSize = dataHelper.readLong8(buffer);
         int proto = dataHelper.readInt2(buffer);
         long dataLength = ncDataHelper.readLongVLC(buffer);
         long optionsLength = ncDataHelper.readLongVLC(buffer);
 
-        //TODO Odczyt danych
-        Map<String, Object> options = null;
-        if (optionsLength != 0) {
-            throw new RuntimeException("optionsLength != 0 - not implemented");
+        return new NcSessionPayloadBlock(timestamp, sessionId, session, -1, proto, dataLength, optionsLength, null, null);
+    }
+
+    private int readSessionPayloadBlockContent(ByteBuff data, int length) throws IOException {
+        int readed = buffer.read(data, length);
+        while (readed < length) {
+            int res = read(bufferSize);
+            if (res == -1) {
+                throw new IOException("Unable to read full content. EOF reached.");
+            }
+
+            readed += buffer.read(data, length - readed);
         }
 
-        return new NcSessionPayloadBlock(timestamp, sessionId, session, totalSize, proto, dataLength, optionsLength, null, options);
+        return readed;
     }
 
-    public int peekNextBlockType() throws IOException {
+    private NcSessionPayloadBlock readSessionPayloadBlock(boolean skipBlockType, boolean skipData, boolean skipOptions) throws IOException {
+        NcSessionPayloadBlock payloadBlock = readSessionPayloadBlockHeader(skipBlockType);
+
+        if (!skipData) {
+            if (payloadBlock.dataLength() <= Integer.MAX_VALUE) {
+                int dataLength = (int) payloadBlock.dataLength();
+                ByteBuff data = new HeapByteBuff(dataLength);
+                readSessionPayloadBlockContent(data, dataLength);
+                payloadBlock.data(data);
+            } else {
+                long startPosition = position();
+                long endPosition = startPosition + payloadBlock.dataLength();
+                ReadonlyFileChannelDataSource ds = new ReadonlyFileChannelDataSource(ch, path, startPosition, endPosition);
+                payloadBlock.data(ds);
+            }
+        }
+
+        Map<String, Object> options = null;
+        if (!skipOptions && payloadBlock.optionsLength() != 0) {
+            throw new RuntimeException("optionsLength != 0 - not implemented yet");
+        }
+
+        return payloadBlock;
+    }
+
+    public int peekBlockType() throws IOException {
         read(1);
         return buffer.get();
     }
 
+    public NcSessionStatusBlock readSessionStatusBlock() throws IOException {
+        return readSessionStatusBlock(false);
+    }
+
+    public NcSessionPayloadBlock readSessionPayloadBlockHeader() throws IOException {
+        return readSessionPayloadBlockHeader(false);
+    }
+
+    public int readSessionPayloadContent(ByteBuffer data, long length) {
+
+        return 0;
+    }
+
     @Override
     public NcDataBlock read() throws IOException {
         checkOpened();
+        if (eof && buffer.isEmpty()) {
+            return null;
+        }
 
         read(1);
         byte blockType = buffer.read();
         switch (blockType) {
             case NcSegmentBlock.TYPE:
-                readSegmentBlock(true);
-                break;
+                return readSegmentBlock(true);
             case NcSessionInfoBlock.TYPE:
-                readSessionInfoBlock(true);
-                break;
+                return readSessionInfoBlock(true);
             case NcSessionStatusBlock.TYPE:
-                readSessionStatusBlock(true);
-                break;
+                return readSessionStatusBlock(true);
             case NcSessionPayloadBlock.TYPE:
-                readSessionPayloadBlock(true);
-                break;
+                return readSessionPayloadBlock(true, false, false);
+            default:
+                throw new IOException("Unsupported block type '" + blockType + "'.");
         }
-
-        return null;
     }
 
 }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Wed Nov 22 15:40:56 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Thu Nov 23 13:21:01 2017 +0100
@@ -294,7 +294,6 @@
         buffer.putShort((short) sessionInfo.getDstPort());
         closeBlock(NcSessionInfoBlock.TYPE, false);
         buffer.putInt(sizePosition, size);
-
         incrementSegmentBlocks(false);
         updateSegmentInfo(size);
 
@@ -317,7 +316,7 @@
         updateSegmentInfo(NcSessionStatusBlock.HEADER_SIZE);
     }
 
-    public void writeSessionPayloadHeader(long timestamp, SessionInfo session, byte proto, long dataLength) throws IOException {
+    public void writeSessionPayloadHeader(long timestamp, SessionInfo session, int proto, long dataLength) throws IOException {
         checkOpened();
 
         int sessionId = getSessionId(session, opened);
@@ -325,11 +324,15 @@
         buffer.put(NcSessionPayloadBlock.TYPE);
         buffer.putLong(timestamp);
         buffer.putInt(sessionId);
-        buffer.put(proto);
 
-        int size = 14;
+        //int totalSizePosition = buffer.position();
+        //buffer.putLong(0L);
+        buffer.putShort((short) proto);
+
+        int size = 18;
         size += ncDataHelper.writeLongVLC(dataLength, buffer);
         size += ncDataHelper.writeLongVLC(0, buffer); // OptionsLength
+        //buffer.putLong(totalSizePosition, size);
 
         updateSegmentInfo(size);
         writeBuffer();
@@ -388,13 +391,14 @@
         closeBlock(NcSessionPayloadBlock.TYPE);
     }
 
-    public void writeSessionPayload(long timestamp, SessionInfo session, byte proto, Object data) throws IOException {
+    public void writeSessionPayload(long timestamp, SessionInfo session, int proto, Object data) throws IOException {
         checkOpened();
 
         ByteBuffer dataBuffer = wrapData(data);
         int dataSize = dataBuffer.remaining();
         writeSessionPayloadHeader(timestamp, session, proto, dataSize);
-        write(dataBuffer);
+        ch.write(dataBuffer);
+        
         updateSegmentInfo(dataSize);
         closeBlock(NcSessionPayloadBlock.TYPE);
     }
@@ -403,7 +407,21 @@
     public void write(NcDataBlock block) throws IOException {
         checkOpened();
 
-        throw new RuntimeException("Not supported yet.");
+        switch (block.type()) {
+            case NcSegmentBlock.TYPE:
+            case NcSessionInfoBlock.TYPE:
+                throw new IllegalArgumentException("Only session status or session payload block should be written.");
+            case NcSessionStatusBlock.TYPE:
+                NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) block;
+                writeSessionStatus(statusBlock.timestamp(), statusBlock.sessionInfo(), statusBlock.status());
+                break;
+            case NcSessionPayloadBlock.TYPE:
+                NcSessionPayloadBlock sessionPayload = (NcSessionPayloadBlock) block;
+                writeSessionPayload(sessionPayload.timestamp(), sessionPayload.sessionInfo(), sessionPayload.proto(), sessionPayload.data());
+                break;
+            default:
+                throw new IllegalArgumentException("Not supported block '" + block.type() + "'.");
+        }
     }
 
 }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java	Wed Nov 22 15:40:56 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java	Thu Nov 23 13:21:01 2017 +0100
@@ -216,11 +216,8 @@
     }
 
     public int writeByteStringNullTerminated(ByteString value, ByteBuff buffer, boolean sanitize) {
-        if (buffer.writableBytes() < value.length()) {
-            buffer.ensureCapacity(value.length());
-        }
-
-        int len = writeByteStringNullTerminated(value, buffer.buffer(), buffer.startIndex(), buffer.capacity(), sanitize);
+        buffer.ensureCapacity(value.length());
+        int len = writeByteStringNullTerminated(value, buffer.buffer(), buffer.endIndex(), buffer.capacity(), sanitize);
         buffer.endIndex(buffer.endIndex() + len);
         return len;
     }
@@ -264,7 +261,7 @@
             int startIndex = buffer.startIndex();
             int length = index - startIndex;
             String value = buffer.toString(startIndex, length, CHARSET);
-            buffer.skipBytes(length);
+            buffer.skipBytes(length + 1);
             return value;
         }
 
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/ReadonlyFileChannelDataSource.java	Wed Nov 22 15:40:56 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/ReadonlyFileChannelDataSource.java	Thu Nov 23 13:21:01 2017 +0100
@@ -15,7 +15,9 @@
  */
 public class ReadonlyFileChannelDataSource implements DataSource {
 
-    private final FileChannel ch;
+    private FileChannel ch;
+
+    private final Path path;
 
     private long startPosition = -1;
 
@@ -25,6 +27,8 @@
 
     private byte[] data;
 
+    private boolean opened = false;
+
     public ReadonlyFileChannelDataSource(File file) throws IOException {
         this(file.toPath());
     }
@@ -34,18 +38,18 @@
     }
 
     public ReadonlyFileChannelDataSource(Path path) throws IOException {
-        this(FileChannel.open(path, StandardOpenOption.READ));
+        this(FileChannel.open(path, StandardOpenOption.READ), path);
     }
 
     public ReadonlyFileChannelDataSource(Path path, long startPosition, long endPosition) throws IOException {
-        this(FileChannel.open(path, StandardOpenOption.READ), startPosition, endPosition);
+        this(FileChannel.open(path, StandardOpenOption.READ), path, startPosition, endPosition);
     }
 
-    public ReadonlyFileChannelDataSource(FileChannel ch) throws IOException {
-        this(ch, 0, ch.size());
+    public ReadonlyFileChannelDataSource(FileChannel ch, Path path) throws IOException {
+        this(ch, path, 0, ch.size());
     }
 
-    public ReadonlyFileChannelDataSource(FileChannel ch, long startPosition, long endPosition) {
+    public ReadonlyFileChannelDataSource(FileChannel ch, Path path, long startPosition, long endPosition) {
         Assert.greaterOrEqualZero(startPosition, "startPosition");
         Assert.greaterOrEqualZero(endPosition, "endPosition");
         Assert.notNull(ch, "channel");
@@ -54,6 +58,7 @@
         }
 
         this.ch = ch;
+        this.path = path;
         this.startPosition = startPosition;
         this.endPosition = endPosition;
     }
@@ -69,12 +74,24 @@
     }
 
     @Override
-    public void open() {
-
+    public void open() throws IOException {
+        if (!ch.isOpen()) {
+            ch = FileChannel.open(path, StandardOpenOption.READ);
+            opened = true;
+        }
     }
 
     @Override
     public void close() throws IOException {
+        if (opened) {
+            try {
+                ch.close();
+            } catch (Exception ignore) {
+            }
+
+            opened = false;
+        }
+
         dataBb = null;
         data = null;
     }
@@ -115,13 +132,13 @@
         }
 
         ByteBuffer dataBb = (this.data == data ? this.dataBb : ByteBuffer.wrap(data));
-        
+
         int available = available();
         int limit = Math.min(offset + length, dataBb.capacity());
-        if( limit > available ) {
+        if (limit > available) {
             limit = available;
         }
-        
+
         dataBb.limit(limit);
         dataBb.position(offset);
 
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionBlock.java	Wed Nov 22 15:40:56 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionBlock.java	Thu Nov 23 13:21:01 2017 +0100
@@ -41,4 +41,12 @@
         this.sessionId = sessionId;
     }
 
+    public SessionInfo sessionInfo() {
+        return sessionInfo;
+    }
+
+    public void sessionInfo(SessionInfo sessionInfo) {
+        this.sessionInfo = sessionInfo;
+    }
+
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java	Thu Nov 23 13:21:01 2017 +0100
@@ -0,0 +1,96 @@
+package com.passus.st.reader.nc;
+
+import com.passus.data.ByteBuff;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.reader.nc.block.NcDataBlock;
+import com.passus.st.reader.nc.block.NcSegmentBlock;
+import com.passus.st.reader.nc.block.NcSessionInfoBlock;
+import com.passus.st.reader.nc.block.NcSessionPayloadBlock;
+import java.io.File;
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.UUID;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertSame;
+import static org.testng.AssertJUnit.assertTrue;
+import org.testng.annotations.Test;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class NcDataBlockReaderTest {
+
+    private final SessionInfo session;
+
+    private final byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9};
+
+    public NcDataBlockReaderTest() throws ParseException {
+        session = new SessionInfo("1.1.1.1:5000", "2.2.2.2:80");
+    }
+
+    private File createTmpFile() throws IOException {
+        File tmpDir = new File(System.getProperty("java.io.tmpdir"), "st");
+        if (!tmpDir.exists()) {
+            tmpDir.mkdir();
+        }
+
+        return new File(tmpDir, "st_" + UUID.randomUUID().toString() + ".tmp");
+    }
+
+    private File writeData() throws Exception {
+        File tmpFile = createTmpFile();
+
+        long time = Long.MAX_VALUE;
+        try (NcDataBlockWriter writer = new NcDataBlockWriter(tmpFile)) {
+            writer.open();
+            writer.writeSessionPayload(time, session, (byte) 100, data);
+        }
+
+        return tmpFile;
+    }
+
+    @Test
+    public void testRead() throws Exception {
+        File file = null;
+
+        try {
+            file = writeData();
+            try (NcDataBlockReader reader = new NcDataBlockReader(file)) {
+                reader.open();
+
+                NcDataBlock block = reader.read();
+                assertEquals(NcSegmentBlock.TYPE, block.type());
+                NcSegmentBlock firstSegmentBlock = (NcSegmentBlock) block;
+                assertEquals(2, firstSegmentBlock.blocksNum());
+
+                block = reader.read();
+                assertEquals(NcSessionInfoBlock.TYPE, block.type());
+                NcSessionInfoBlock sessionInfoBlock = (NcSessionInfoBlock) block;
+                assertEquals(session, sessionInfoBlock.sessionInfo());
+
+                block = reader.read();
+                assertEquals(NcSessionPayloadBlock.TYPE, block.type());
+                NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) block;
+                assertSame(sessionInfoBlock.sessionInfo(), payloadBlock.sessionInfo());
+                assertEquals(data.length, payloadBlock.dataLength());
+                assertEquals(0, payloadBlock.optionsLength());
+                assertTrue(payloadBlock.data() instanceof ByteBuff);
+                ByteBuff payload = (ByteBuff) payloadBlock.data();
+                assertEquals(data, payload.toArray());
+                
+                /*block = reader.read();
+                assertEquals(NcSegmentBlock.TYPE, block.type());
+                NcSegmentBlock lastSegmentBlock = (NcSegmentBlock) block;
+                assertEquals(0, lastSegmentBlock.blocksNum());
+                assertEquals(0, lastSegmentBlock.totalSize());
+                assertEquals(firstSegmentBlock.totalSize(), lastSegmentBlock.prevTotalSize());*/
+            }
+        } finally {
+            if (file != null) {
+                file.delete();
+            }
+        }
+    }
+
+}
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataHelperTest.java	Wed Nov 22 15:40:56 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataHelperTest.java	Thu Nov 23 13:21:01 2017 +0100
@@ -1,5 +1,6 @@
 package com.passus.st.reader.nc;
 
+import com.passus.commons.AsciiUtils;
 import com.passus.data.ByteBuff;
 import com.passus.data.HeapByteBuff;
 import java.nio.ByteBuffer;
@@ -14,7 +15,7 @@
  * @author Mirosław Hawrot
  */
 public class NcDataHelperTest {
-    
+
     private static final byte B_FF = (byte) 0xff;
     private static final byte B_80 = (byte) 0x80;
 
@@ -104,4 +105,43 @@
         helper.writeLongVLC(length, result);
         assertEquals(expectedResult, result);
     }
+
+    @Test
+    public void testWriteStringNullTerminated() {
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+
+        helper.writeStringNullTerminated("", buffer);
+        buffer.flip();
+        assertEquals(1, buffer.remaining());
+        assertEquals(AsciiUtils.NUL, buffer.get());
+        buffer.clear();
+
+        helper.writeStringNullTerminated(null, buffer);
+        buffer.flip();
+        assertEquals(1, buffer.remaining());
+        assertEquals(AsciiUtils.NUL, buffer.get());
+        buffer.clear();
+        
+        helper.writeStringNullTerminated("123", buffer);
+        buffer.flip();
+        assertEquals(4, buffer.remaining());
+        assertEquals(0x31, buffer.get());
+        assertEquals(0x32, buffer.get());
+        assertEquals(0x33, buffer.get());
+        assertEquals(AsciiUtils.NUL, buffer.get());
+
+    }
+
+    @Test
+    public void testReadStringNullTerminated() {
+        ByteBuff buffer = new HeapByteBuff(new byte[]{0x31, 0x32, 0x33, AsciiUtils.NUL});
+        String res = helper.readStringNullTerminated(buffer);
+        assertEquals("123", res);
+        assertEquals(0, buffer.readableBytes());
+
+        buffer = new HeapByteBuff(new byte[]{AsciiUtils.NUL});
+        res = helper.readStringNullTerminated(buffer);
+        assertEquals("", res);
+        assertEquals(0, buffer.readableBytes());
+    }
 }