changeset 677:ff5f3f3cdf60

NC file in progress
author Devel 2
date Tue, 21 Nov 2017 14:39:48 +0100
parents f1547b730293
children 77d613d85107
files stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSegmentBlock.java stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionStatusBlock.java stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java
diffstat 4 files changed, 159 insertions(+), 47 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Tue Nov 21 10:43:31 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Tue Nov 21 14:39:48 2017 +0100
@@ -31,7 +31,13 @@
 
     private static final int DEFAULT_BYTE_BUFFER = 64 * 1024;
 
-    private NcDataHelper dataHelper = NcDataHelper.getInstance();
+    private static final int BLOCK_STAGE_HEADER_WRITE = 1;
+
+    private static final int BLOCK_STAGE_CONTENT_WRITE = 2;
+
+    private static final int MAX_SEGMENT_BLOCKS = Short.MAX_VALUE;
+
+    private final NcDataHelper dataHelper = NcDataHelper.getInstance();
 
     private final Path path;
 
@@ -41,9 +47,13 @@
 
     private int bufferSize = DEFAULT_BYTE_BUFFER;
 
+    private int currentBlockType = -1;
+
+    private int currentBlockStage = -1;
+
     private int maxSegmentBlocks = DEFAULT_MAX_SEGMENT_BLOCKS;
 
-    private long currentSegmentSizePos = -1;
+    private long currentSegmentInfoPos = -1;
 
     private int currentSegmentsBlocks = -1;
 
@@ -53,7 +63,7 @@
 
     private final Map<SessionInfo, Integer> sessionIdMap = new HashMap<>();
 
-    private ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+    private final ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
 
     public NcDataBlockWriter(String fileName) {
         this.path = Paths.get(fileName);
@@ -74,9 +84,21 @@
 
     public void setMaxSegmentBlocks(int maxSegmentBlocks) {
         Assert.greaterThanZero(maxSegmentBlocks, "maxSegmentBlocks");
+        if (maxSegmentBlocks > MAX_SEGMENT_BLOCKS) {
+            throw new IllegalArgumentException("maxSegmentBlocks should be less or equal than " + MAX_SEGMENT_BLOCKS + ".");
+        }
+
         this.maxSegmentBlocks = maxSegmentBlocks;
     }
 
+    public boolean isUpdateSegmentSizeOnClose() {
+        return updateSegmentSizeOnClose;
+    }
+
+    public void setUpdateSegmentSizeOnClose(boolean updateSegmentSizeOnClose) {
+        this.updateSegmentSizeOnClose = updateSegmentSizeOnClose;
+    }
+
     private void write(ByteBuffer buff) throws IOException {
         buff.flip();
         ch.write(buff);
@@ -128,9 +150,9 @@
     public void close() throws IOException {
         if (ch != null) {
             closeSegmentBlock();
+            flush();
 
             try {
-                flush();
                 ch.close();
             } catch (Exception ignore) {
             }
@@ -145,7 +167,7 @@
         }
     }
 
-    private int getSessionId(SessionInfo session, boolean writeIfNeeded) {
+    private int getSessionId(SessionInfo session, boolean writeIfNeeded) throws IOException {
         Integer sessionId = sessionIdMap.get(session);
         if (writeIfNeeded && sessionId == null) {
             sessionId = writeSessionInfoBlock(session);
@@ -154,43 +176,43 @@
         return sessionId;
     }
 
-    private int writeHeader() throws IOException {
-        buffer.clear();
+    private void writeHeader() throws IOException {
         buffer.put(NcHeader.PREAMBULE);
         buffer.put(NcDataHelper.VERSION_MAJOR);
         buffer.put(NcDataHelper.VERSION_MINOR);
         buffer.putLong(0L);
         writeBuffer();
-        return NcHeader.SIZE;
     }
 
     private void openSegmentBlock() throws IOException {
+        currentSegmentInfoPos = ch.position() + 1;
         buffer.put(NcSegmentBlock.TYPE);
-        currentSegmentSizePos = ch.position();
         buffer.putLong(0L);
+        buffer.putShort((short) 0);
+        writeBuffer();
 
         currentSegmentsBlocks = 0;
         currentSegmentSize = 0;
     }
 
     private void closeSegmentBlock() throws IOException {
-        if (currentSegmentSizePos == -1) {
+        if (currentSegmentInfoPos == -1) {
             throw new IllegalStateException("Unable to update segment size. Segment not created.");
         }
 
         updateSegmentInfo(-1, true);
-        currentSegmentSizePos = -1;
+        currentSegmentInfoPos = -1;
         currentSegmentsBlocks = -1;
         currentSegmentSize = -1;
         sessionIdMap.clear();
     }
 
-    private void updateSegmentSize(long size) throws IOException {
+    private void updateSegmentInfo(long size) throws IOException {
         updateSegmentInfo(size, false);
     }
 
     private void updateSegmentInfo(long size, boolean close) throws IOException {
-        if (currentSegmentSizePos == -1) {
+        if (currentSegmentInfoPos == -1) {
             throw new IllegalStateException("Unable to update segment size. Segment not created.");
         }
 
@@ -198,29 +220,76 @@
             currentSegmentSize += size;
         }
 
-        if (updateSegmentSizeOnClose && !close) {
+        if (close || !updateSegmentSizeOnClose) {
             buffer.clear();
             buffer.putLong(currentSegmentSize);
-            writeBuffer(currentSegmentSizePos);
+            buffer.putShort((short) currentSegmentsBlocks);
+            writeBuffer(currentSegmentInfoPos);
         }
     }
 
-    private int writeSessionInfoBlock(SessionInfo sessionInfo) {
-        int size = sessionIdMap.size();
-        int sessionId = size + 1;
+    private void openBlock(int blockType) throws IOException {
+        if (currentBlockType != -1 && currentBlockType != blockType) {
+            throw new IllegalStateException("Unable to open block '" + blockType + "' "
+                    + " another block '" + currentBlockType + "' is opened.");
+        }
+
+        this.currentBlockType = blockType;
+        this.currentBlockStage = BLOCK_STAGE_HEADER_WRITE;
+    }
+
+    private void closeBlock(int blockType) throws IOException {
+        closeBlock(blockType, true);
+    }
+
+    private void closeBlock(int blockType, boolean incrementSegmentBlocks) throws IOException {
+        if (currentBlockType != blockType) {
+            throw new IllegalStateException("Unable to close block '" + blockType + "' "
+                    + " another block '" + currentBlockType + "' is opened.");
+        }
+
+        this.currentBlockType = -1;
+        this.currentBlockStage = -1;
+        if (incrementSegmentBlocks) {
+            incrementSegmentBlocks();
+        }
+    }
+
+    private void incrementSegmentBlocks() throws IOException {
+        incrementSegmentBlocks(true);
+    }
+
+    private void incrementSegmentBlocks(boolean closeSegmentIfNeeded) throws IOException {
+        currentSegmentsBlocks++;
+
+        if (closeSegmentIfNeeded && currentSegmentsBlocks >= maxSegmentBlocks) {
+            closeSegmentBlock();
+        }
+    }
+
+    private int writeSessionInfoBlock(SessionInfo sessionInfo) throws IOException {
+        int sessionId = sessionIdMap.size() + 1;
 
         if (sessionIdMap.put(sessionInfo, sessionId) != null) {
             throw new IllegalStateException("Session block '" + sessionInfo + "' already exists.");
         }
 
+        openBlock(NcSessionInfoBlock.TYPE);
+        int size = 10;
         buffer.put(NcSessionInfoBlock.TYPE);
         buffer.putInt(sessionId);
-        dataHelper.writeStringNullTerminated(sessionInfo.getSourceName(), buffer);
+        size += dataHelper.writeStringNullTerminated(sessionInfo.getSourceName(), buffer);
         buffer.put((byte) sessionInfo.getTransport());
-        dataHelper.writeIpAddress(sessionInfo.getSrcIp(), buffer);
+        size += dataHelper.writeIpAddress(sessionInfo.getSrcIp(), buffer);
         buffer.putShort((short) sessionInfo.getSrcPort());
-        dataHelper.writeIpAddress(sessionInfo.getDstIp(), buffer);
+        size += dataHelper.writeIpAddress(sessionInfo.getDstIp(), buffer);
         buffer.putShort((short) sessionInfo.getDstPort());
+        closeBlock(NcSessionInfoBlock.TYPE, false);
+
+        incrementSegmentBlocks(false);
+        updateSegmentInfo(size);
+        
+
         return sessionId;
     }
 
@@ -228,26 +297,34 @@
         checkOpened();
 
         int sessionId = getSessionId(session, true);
+
+        openBlock(NcSessionStatusBlock.TYPE);
         buffer.put(NcSessionStatusBlock.TYPE);
         buffer.putLong(timestamp);
         buffer.putInt(sessionId);
         buffer.put(status);
 
         writeBuffer();
+        closeBlock(NcSessionStatusBlock.TYPE);
+        updateSegmentInfo(NcSessionStatusBlock.HEADER_SIZE);
     }
 
     public void writeSessionPayloadHeader(long timestamp, SessionInfo session, byte proto, long dataLength) throws IOException {
         checkOpened();
 
         int sessionId = getSessionId(session, opened);
-        buffer.clear();
+        openBlock(NcSessionPayloadBlock.TYPE);
         buffer.put(NcSessionPayloadBlock.TYPE);
         buffer.putLong(timestamp);
         buffer.putInt(sessionId);
         buffer.put(proto);
-        dataHelper.writeLongVLC(dataLength, buffer);
 
+        int size = 14;
+        size += dataHelper.writeLongVLC(dataLength, buffer);
+
+        updateSegmentInfo(size);
         writeBuffer();
+        currentBlockStage = BLOCK_STAGE_CONTENT_WRITE;
     }
 
     private ByteBuffer wrapData(Object data) throws IOException {
@@ -277,10 +354,29 @@
 
     public void writeSessionPayloadContent(Object data) throws IOException {
         checkOpened();
+
+        if (currentBlockType != NcSessionPayloadBlock.TYPE) {
+            throw new IllegalStateException("Unable to close block '" + NcSessionPayloadBlock.TYPE + ". Block is not opened.");
+        } else if (currentBlockStage != BLOCK_STAGE_CONTENT_WRITE) {
+            throw new IllegalStateException("Unable to close block '" + NcSessionPayloadBlock.TYPE + ". Header is not written.");
+        }
+
         ByteBuffer dataBuffer = wrapData(data);
         int dataSize = dataBuffer.remaining();
         write(dataBuffer);
-        updateSegmentSize(dataSize);
+        updateSegmentInfo(dataSize);
+    }
+
+    public void closeSessionPayloadBlock() throws Exception {
+        checkOpened();
+
+        if (currentBlockType != NcSessionPayloadBlock.TYPE) {
+            throw new IllegalStateException("Unable to close block '" + NcSessionPayloadBlock.TYPE + ". Invalid block type.");
+        } else if (currentBlockStage != BLOCK_STAGE_CONTENT_WRITE) {
+            throw new IllegalStateException("Unable to close block '" + NcSessionPayloadBlock.TYPE + ". Header is not written.");
+        }
+
+        closeBlock(NcSessionPayloadBlock.TYPE);
     }
 
     public void writeSessionPayload(long timestamp, SessionInfo session, byte proto, Object data) throws IOException {
@@ -290,12 +386,8 @@
         int dataSize = dataBuffer.remaining();
         writeSessionPayloadHeader(timestamp, session, proto, dataSize);
         write(dataBuffer);
-        updateSegmentSize(dataSize);
-    }
-
-    public void closeSessionPayloadBlock() throws Exception {
-        checkOpened();
-
+        updateSegmentInfo(dataSize);
+        closeBlock(NcSessionPayloadBlock.TYPE);
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSegmentBlock.java	Tue Nov 21 10:43:31 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSegmentBlock.java	Tue Nov 21 14:39:48 2017 +0100
@@ -10,6 +10,8 @@
 
     private long totalSize;
 
+    private int blocksNum;
+
     @Override
     public byte getType() {
         return TYPE;
@@ -23,4 +25,12 @@
         this.totalSize = totalSize;
     }
 
+    public int blocksNum() {
+        return blocksNum;
+    }
+
+    public void blocksNum(int blocksNum) {
+        this.blocksNum = blocksNum;
+    }
+
 }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionStatusBlock.java	Tue Nov 21 10:43:31 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionStatusBlock.java	Tue Nov 21 14:39:48 2017 +0100
@@ -8,6 +8,8 @@
 
     public static final byte TYPE = 4;
 
+    public static final int HEADER_SIZE = 14;
+    
     private byte status;
 
     @Override
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java	Tue Nov 21 10:43:31 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java	Tue Nov 21 14:39:48 2017 +0100
@@ -1,10 +1,14 @@
 package com.passus.st.reader.nc;
 
+import com.passus.data.DataHelper;
 import com.passus.st.emitter.SessionInfo;
+import com.passus.st.reader.nc.block.NcSegmentBlock;
+import com.passus.st.reader.nc.block.NcSessionInfoBlock;
 import java.io.File;
 import java.io.IOException;
 import java.util.UUID;
 import org.apache.commons.io.FileUtils;
+import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
 import org.testng.annotations.Test;
 
@@ -14,6 +18,10 @@
  */
 public class NcDataBlockWriterTest {
 
+    private final DataHelper dataHelper = DataHelper.BIG_ENDIAN;
+
+    private final NcDataHelper ncDataHelper = NcDataHelper.getInstance();
+
     private File createTmpFile() throws IOException {
         File tmpDir = new File(System.getProperty("java.io.tmpdir"), "st");
         if (!tmpDir.exists()) {
@@ -24,22 +32,6 @@
     }
 
     @Test
-    public void testWrite_Header() throws Exception {
-        File tmpFile = createTmpFile();
-
-        try {
-            try (NcDataBlockWriter writer = new NcDataBlockWriter(tmpFile)) {
-                writer.open();
-            }
-
-            byte[] content = FileUtils.readFileToByteArray(tmpFile);
-            assertTrue(NcHeader.isPreambule(content, 0));
-        } finally {
-            tmpFile.delete();
-        }
-    }
-
-    @Test
     public void testWrite_WritePayload() throws Exception {
         File tmpFile = createTmpFile();
 
@@ -52,8 +44,24 @@
                 writer.writeSessionPayload(time, session, (byte) 100, data);
             }
 
+            int offset = 0;
             byte[] content = FileUtils.readFileToByteArray(tmpFile);
-            assertTrue(NcHeader.isPreambule(content, 0));
+            assertTrue(NcHeader.isPreambule(content, offset));
+            offset += NcHeader.PREAMBULE.length;
+            assertEquals(NcDataHelper.VERSION_MAJOR, content[offset++]);
+            assertEquals(NcDataHelper.VERSION_MINOR, content[offset++]);
+            offset += 8;
+
+            //SegmentBlock
+            assertEquals(NcSegmentBlock.TYPE, content[offset++]);
+            long segmentSize = dataHelper.getLong8(content, offset);
+            offset += 8;
+            int segmentBlocks = dataHelper.getInt2(content, offset);
+            offset += 2;
+            assertEquals(2, segmentBlocks);
+
+            //SessionInfoBlock
+            assertEquals(NcSessionInfoBlock.TYPE, content[offset++]);
         } finally {
             tmpFile.delete();
         }