changeset 706:b6da092613be

NC file in progress
author Devel 2
date Wed, 29 Nov 2017 09:47:53 +0100
parents 122560887a6a
children 490739826756
files stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/NcSegmentBlock.java stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java
diffstat 4 files changed, 88 insertions(+), 9 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java	Tue Nov 28 15:31:17 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java	Wed Nov 29 09:47:53 2017 +0100
@@ -213,6 +213,7 @@
             checkBlockType(blockType, NcSegmentBlock.TYPE);
         }
 
+        sessionIdMap.clear();
         long totalSize = ncDataHelper.readLong8(buffer);
         long prevTotalSize = ncDataHelper.readLong8(buffer);
         int blocksNum = ncDataHelper.readInt2(buffer);
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Tue Nov 28 15:31:17 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Wed Nov 29 09:47:53 2017 +0100
@@ -55,7 +55,9 @@
 
     private long currentSegmentInfoPos = -1;
 
-    private int currentSegmentsBlocks = -1;
+    private int currentSegmentSessionInfoBlocks = -1;
+    
+    private int currentSegmentsPayloadBlocks = -1;
 
     private long currentSegmentSize = -1;
 
@@ -201,7 +203,8 @@
         buffer.putShort((short) 0);
         writeBuffer();
 
-        currentSegmentsBlocks = 0;
+        currentSegmentsPayloadBlocks = 0;
+        currentSegmentSessionInfoBlocks = 0;
         currentSegmentSize = 0;
     }
 
@@ -210,13 +213,17 @@
             throw new IllegalStateException("Unable to update segment size. Segment not created.");
         }
 
+        boolean openNew = currentSegmentsPayloadBlocks > 0;
         updateSegmentInfo(-1, true);
         prevSegmentSize = currentSegmentSize;
         currentSegmentInfoPos = -1;
-        currentSegmentsBlocks = -1;
+        currentSegmentsPayloadBlocks = -1;
+        currentSegmentSessionInfoBlocks = -1;
         currentSegmentSize = -1;
         sessionIdMap.clear();
-        openSegmentBlock();
+        if (openNew) {
+            openSegmentBlock();
+        }
     }
 
     private void updateSegmentInfo(long size) throws IOException {
@@ -236,7 +243,7 @@
             buffer.clear();
             buffer.putLong(currentSegmentSize);
             buffer.putLong(prevSegmentSize);
-            buffer.putShort((short) currentSegmentsBlocks);
+            buffer.putShort((short) (currentSegmentsPayloadBlocks + currentSegmentSessionInfoBlocks));
             writeBuffer(currentSegmentInfoPos);
         }
     }
@@ -273,9 +280,9 @@
     }
 
     private void incrementSegmentBlocks(boolean closeSegmentIfNeeded) throws IOException {
-        currentSegmentsBlocks++;
+        currentSegmentsPayloadBlocks++;
 
-        if (closeSegmentIfNeeded && currentSegmentsBlocks >= maxSegmentBlocks) {
+        if (closeSegmentIfNeeded && currentSegmentsPayloadBlocks >= maxSegmentBlocks) {
             closeSegmentBlock();
         }
     }
@@ -302,7 +309,7 @@
         buffer.putShort((short) sessionInfo.getDstPort());
         closeBlock(NcSessionInfoBlock.TYPE, false);
         buffer.putInt(sizePosition, size);
-        incrementSegmentBlocks(false);
+        currentSegmentSessionInfoBlocks++;
         updateSegmentInfo(size);
 
         return sessionId;
@@ -327,7 +334,7 @@
     public void writeSessionPayloadHeader(long timestamp, SessionInfo session, int proto, long dataSize) throws IOException {
         checkOpened();
 
-        int sessionId = getSessionId(session, opened);
+        int sessionId = getSessionId(session, true);
         openBlock(NcSessionPayloadBlock.TYPE);
         buffer.put(NcSessionPayloadBlock.TYPE);
         buffer.putLong(timestamp);
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcSegmentBlock.java	Tue Nov 28 15:31:17 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcSegmentBlock.java	Wed Nov 29 09:47:53 2017 +0100
@@ -54,4 +54,9 @@
         this.blocksNum = blocksNum;
     }
 
+    @Override
+    public String toString() {
+        return "NcSegmentBlock{" + "totalSize=" + totalSize + ", prevTotalSize=" + prevTotalSize + ", blocksNum=" + blocksNum + '}';
+    }
+
 }
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java	Tue Nov 28 15:31:17 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java	Wed Nov 29 09:47:53 2017 +0100
@@ -4,6 +4,8 @@
 import com.passus.st.emitter.SessionInfo;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 import org.apache.commons.io.FileUtils;
 import static org.testng.AssertJUnit.assertEquals;
@@ -67,4 +69,68 @@
             tmpFile.delete();
         }
     }
+
+    @Test
+    public void testWrite_WriteManyPayloads() throws Exception {
+        File tmpFile = createTmpFile();
+
+        long time = Long.MAX_VALUE;
+        SessionInfo session = new SessionInfo("1.1.1.1:5000", "2.2.2.2:80");
+        byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9};
+        int payloadsNum = 100;
+        int maxSegmentBlocks = 10;
+        try {
+
+            try (NcDataBlockWriter writer = new NcDataBlockWriter(tmpFile)) {
+                writer.open();
+                writer.setMaxSegmentBlocks(maxSegmentBlocks);
+                int countDown = payloadsNum;
+                do {
+                    writer.writeSessionPayload(time, session, (byte) 100, data);
+                } while (--countDown > 0);
+            }
+
+            List<NcDataBlock> blocks = new ArrayList<>();
+            try (NcDataBlockReader reader = new NcDataBlockReader(tmpFile)) {
+                reader.open();
+
+                NcDataBlock block;
+                while ((block = reader.read()) != null) {
+                    blocks.add(block);
+                }
+            }
+
+            long segmentsCount = 0;
+            long payloadsCount = 0;
+            NcSegmentBlock lastSegment = null;
+            for (int i = 0; i < blocks.size(); i++) {
+                boolean lastBlock = (i == (blocks.size() - 1));
+                NcDataBlock block = blocks.get(i);
+                if (block.type() == NcSegmentBlock.TYPE) {
+                    NcSegmentBlock segmentBlock = (NcSegmentBlock) block;
+                    segmentsCount++;
+                    if (lastSegment != null) {
+                        assertEquals(lastSegment.totalSize(), segmentBlock.prevTotalSize());
+                    }
+
+                    if (lastBlock) {
+                        assertEquals(0, segmentBlock.blocksNum());
+                    } else {
+                        assertEquals(maxSegmentBlocks + 1, segmentBlock.blocksNum());
+                    }
+
+                    lastSegment = segmentBlock;
+                } else if (block.type() == NcSessionPayloadBlock.TYPE) {
+                    payloadsCount++;
+                }
+            }
+
+            assertEquals(11, segmentsCount);
+            assertEquals(payloadsNum, payloadsCount);
+
+        } finally {
+            tmpFile.delete();
+        }
+
+    }
 }