changeset 739:3fa7d471145f

Nc file - options support
author Devel 2
date Wed, 06 Dec 2017 09:54:50 +0100
parents 8e37b4b2d695
children bd686b6618b1
files stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlock.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionInfoBlock.java stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionStatusBlock.java stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java
diffstat 5 files changed, 153 insertions(+), 47 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlock.java	Tue Dec 05 15:24:49 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlock.java	Wed Dec 06 09:54:50 2017 +0100
@@ -10,4 +10,5 @@
     
     public byte type();
 
+    public long totalSize();
 }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Tue Dec 05 15:24:49 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Wed Dec 06 09:54:50 2017 +0100
@@ -199,7 +199,6 @@
         return sessionId;
     }
 
-    
     private void writeHeader() throws IOException {
         buffer.put(NcHeader.PREAMBULE);
         buffer.put(NcDataHelper.VERSION_MAJOR);
@@ -218,7 +217,7 @@
         writeBuffer();
 
         currentSegmentsPayloadBlocks = 0;
-        currentSegmentSessionInfoBlocks = 0;        
+        currentSegmentSessionInfoBlocks = 0;
     }
 
     private void closeSegmentBlock() throws IOException {
@@ -325,7 +324,7 @@
         ncDataHelper.writeIpAddress(buffer, sessionInfo.getDstIp());
         buffer.putShort((short) sessionInfo.getDstPort());
         closeBlock(NcSessionInfoBlock.TYPE, false);
-        
+
         int size = buffer.position() - startPos;
         buffer.putInt(sizePosition, size);
         currentSegmentSessionInfoBlocks++;
@@ -384,7 +383,7 @@
         buffer.putShort((short) proto);
         writeBuffer();
         updateSegmentInfo(payloadBlockTotalSize);
-        
+
         currentBlockStage = BLOCK_STAGE_CONTENT_WRITE;
     }
 
@@ -476,6 +475,10 @@
     }
 
     public void writeSessionPayload(long timestamp, SessionInfo session, int proto, Object data) throws IOException {
+        writeSessionPayload(timestamp, session, proto, data, null);
+    }
+
+    public void writeSessionPayload(long timestamp, SessionInfo session, int proto, Object data, List<Option> options) throws IOException {
         checkOpened();
 
         ByteBuffer dataBuffer = wrapData(data);
@@ -484,6 +487,11 @@
         ch.write(dataBuffer);
         updateSessionPayloadSize(dataSize, true);
         updateSegmentInfo(dataSize);
+
+        if (options != null) {
+            writeSessionPayloadOptions(options);
+        }
+
         closeBlock(NcSessionPayloadBlock.TYPE);
     }
 
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionInfoBlock.java	Tue Dec 05 15:24:49 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionInfoBlock.java	Wed Dec 06 09:54:50 2017 +0100
@@ -32,7 +32,8 @@
         return TYPE;
     }
 
-    public int totalSize() {
+    @Override
+    public long totalSize() {
         return totalSize;
     }
 
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionStatusBlock.java	Tue Dec 05 15:24:49 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionStatusBlock.java	Wed Dec 06 09:54:50 2017 +0100
@@ -7,32 +7,37 @@
  * @author Mirosław Hawrot
  */
 public class NcSessionStatusBlock extends NcSessionBlock {
-    
+
     public static final byte TYPE = 4;
-    
+
     public static final int HEADER_SIZE = NcSessionBlock.HEADER_SIZE + 2;
-    
+
     private byte status;
-    
+
     public NcSessionStatusBlock() {
     }
-    
+
     public NcSessionStatusBlock(long timestamp, int sessionId, SessionInfo sessionInfo, byte status) {
         super(timestamp, sessionId, sessionInfo);
         this.status = status;
     }
-    
+
     @Override
     public byte type() {
         return TYPE;
     }
-    
+
+    @Override
+    public long totalSize() {
+        return HEADER_SIZE;
+    }
+
     public byte status() {
         return status;
     }
-    
+
     public void status(byte status) {
         this.status = status;
     }
-    
+
 }
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java	Tue Dec 05 15:24:49 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java	Wed Dec 06 09:54:50 2017 +0100
@@ -1,15 +1,17 @@
 package com.passus.st.reader.nc;
 
+import com.passus.data.ByteBuff;
 import com.passus.st.emitter.SessionInfo;
+import com.passus.st.reader.nc.option.Option;
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import org.apache.commons.io.FileUtils;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
+import static org.testng.AssertJUnit.fail;
 import org.testng.annotations.Test;
 
 /**
@@ -20,6 +22,12 @@
 
     private final NcDataHelper dataHelper = NcDataHelper.getInstance();
 
+    private final SessionInfo session;
+
+    public NcDataBlockWriterTest() throws Exception {
+        session = new SessionInfo("1.1.1.1:5000", "2.2.2.2:80");
+    }
+
     private File createTmpFile() throws IOException {
         File tmpDir = new File(System.getProperty("java.io.tmpdir"), "st");
         if (!tmpDir.exists()) {
@@ -29,13 +37,84 @@
         return new File(tmpDir, "st_" + UUID.randomUUID().toString() + ".tmp");
     }
 
+    private BlocksStats assertBlocks(List<NcDataBlock> blocks, byte[] payloadData, List<Option> payloadOptions) {
+        BlocksStats stats = new BlocksStats();
+        NcSegmentBlock lastSegment = null;
+        int currentSegmentBlocksNum = 0;
+        long currentSegmentTotalSize = 0;
+        int currentSessionId = -1;
+        int segmentSessionId = -1;
+        for (int i = 0; i < blocks.size(); i++) {
+            boolean lastBlock = (i == (blocks.size() - 1));
+            NcDataBlock block = blocks.get(i);
+            if (block.type() == NcSegmentBlock.TYPE) {
+                NcSegmentBlock segmentBlock = (NcSegmentBlock) block;
+                stats.segmentsCount++;
+                if (lastSegment != null) {
+                    assertEquals(lastSegment.totalSize(), segmentBlock.prevTotalSize());
+                    assertEquals(lastSegment.totalSize(), currentSegmentTotalSize + NcSegmentBlock.HEADER_SIZE);
+                    assertEquals(lastSegment.blocksNum(), currentSegmentBlocksNum);
+                }
+
+                if (lastBlock) {
+                    assertEquals(0, segmentBlock.blocksNum());
+                }
+
+                lastSegment = segmentBlock;
+                currentSegmentBlocksNum = 0;
+                currentSegmentTotalSize = 0;
+                segmentSessionId = 0;
+            } else if (block.type() == NcSessionPayloadBlock.TYPE) {
+                NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) block;
+                assertEquals(session, payloadBlock.sessionInfo());
+                byte[] data = ((ByteBuff) payloadBlock.data()).toArray();
+                assertEquals(payloadData, data);
+                assertEquals(NcSessionPayloadBlock.HEADER_SIZE + data.length, payloadBlock.totalSize() - payloadBlock.optionsSize());
+                assertEquals(data.length, payloadBlock.dataSize());
+
+                List<Option> options = payloadBlock.options();
+                if (payloadOptions == null) {
+                    assertTrue(options == null);
+                } else {
+                    assertTrue(options != null);
+                    assertEquals(payloadOptions.size(), options.size());
+                    for (int j = 0; j < payloadOptions.size(); j++) {
+                        Option opt1 = payloadOptions.get(j);
+                        Option opt2 = options.get(j);
+                        assertEquals(opt1.getName(), opt2.getName());
+                        assertEquals(opt1.getValue(), opt2.getValue());
+                    }
+                }
+
+                stats.payloadsCount++;
+                currentSegmentBlocksNum++;
+                currentSegmentTotalSize += payloadBlock.totalSize();
+            } else if (block.type() == NcSessionInfoBlock.TYPE) {
+                segmentSessionId++;
+                NcSessionInfoBlock sessionInfoBlock = (NcSessionInfoBlock) block;
+                assertEquals(session, sessionInfoBlock.sessionInfo());
+                assertEquals(segmentSessionId, sessionInfoBlock.sessionId());
+                currentSegmentBlocksNum++;
+                currentSegmentTotalSize += sessionInfoBlock.totalSize();
+            } else if (block.type() == NcSessionStatusBlock.TYPE) {
+                NcSessionStatusBlock sessionStatusBlocks = (NcSessionStatusBlock) block;
+                assertEquals(session, sessionStatusBlocks.sessionInfo());
+                currentSegmentBlocksNum++;
+                currentSegmentTotalSize += sessionStatusBlocks.totalSize();
+            } else {
+                fail("Unknown block type '" + block.type() + "'.");
+            }
+        }
+
+        return stats;
+    }
+
     @Test
-    public void testWrite_WritePayload() throws Exception {
+    public void testWrite_PayloadNoOption() throws Exception {
         File tmpFile = createTmpFile();
 
         int dataProto = 100;
         long time = Long.MAX_VALUE;
-        SessionInfo session = new SessionInfo("1.1.1.1:5000", "2.2.2.2:80");
         session.setSourceName("test");
         byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9};
         try {
@@ -52,9 +131,8 @@
             assertEquals(NcDataHelper.VERSION_MINOR, content[offset++]);
             offset += 8;
 
-            long startOffset = offset;
             long currentSegmentSize = content.length
-                    - NcHeader.SIZE 
+                    - NcHeader.SIZE
                     - NcSegmentBlock.HEADER_SIZE;
 
             //SegmentBlock
@@ -65,7 +143,7 @@
             offset += 8;
             int segmentBlocks = dataHelper.getInt2(content, offset);
             offset += 2;
-            
+
             long lastSegmentSize = segmentSize;
             assertEquals(currentSegmentSize, segmentSize);
             assertEquals(0, prevSegmentSize);
@@ -93,7 +171,7 @@
             assertEquals(data.length, dataSize);
 
             offset += dataSize;
-            
+
             //SegmentBlock
             assertEquals(NcSegmentBlock.TYPE, content[offset++]);
             segmentSize = dataHelper.getLong8(content, offset);
@@ -112,12 +190,11 @@
         }
     }
 
-    @Test(enabled = false)
-    public void testWrite_WriteManyPayloads() throws Exception {
+    @Test
+    public void testWrite_ManyPayloadsNoOption() throws Exception {
         File tmpFile = createTmpFile();
 
         long time = Long.MAX_VALUE;
-        SessionInfo session = new SessionInfo("1.1.1.1:5000", "2.2.2.2:80");
         byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9};
         int payloadsNum = 100;
         int maxSegmentBlocks = 10;
@@ -141,37 +218,51 @@
                 }
             }
 
-            long segmentsCount = 0;
-            long payloadsCount = 0;
-            NcSegmentBlock lastSegment = null;
-            for (int i = 0; i < blocks.size(); i++) {
-                boolean lastBlock = (i == (blocks.size() - 1));
-                NcDataBlock block = blocks.get(i);
-                if (block.type() == NcSegmentBlock.TYPE) {
-                    NcSegmentBlock segmentBlock = (NcSegmentBlock) block;
-                    segmentsCount++;
-                    if (lastSegment != null) {
-                        assertEquals(lastSegment.totalSize(), segmentBlock.prevTotalSize());
-                    }
+            BlocksStats stats = assertBlocks(blocks, data, null);
+            assertEquals(11, stats.segmentsCount);
+            assertEquals(payloadsNum, stats.payloadsCount);
+        } finally {
+            tmpFile.delete();
+        }
+    }
 
-                    if (lastBlock) {
-                        assertEquals(0, segmentBlock.blocksNum());
-                    } else {
-                        assertEquals(maxSegmentBlocks + 1, segmentBlock.blocksNum());
-                    }
+    @Test
+    public void testWrite_PayloadWithOptions() throws Exception {
+        File tmpFile = createTmpFile();
+        try {
+            byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9};
+            long time = Long.MAX_VALUE;
+            List<Option> options = new ArrayList<>();
+            options.add(new Option("test", 1));
+            try (NcDataBlockWriter writer = new NcDataBlockWriter(tmpFile)) {
+                writer.open();
+                writer.writeSessionPayload(time, session, (byte) 100, data, options);
+            }
 
-                    lastSegment = segmentBlock;
-                } else if (block.type() == NcSessionPayloadBlock.TYPE) {
-                    payloadsCount++;
+            List<NcDataBlock> blocks = new ArrayList<>();
+            try (NcDataBlockReader reader = new NcDataBlockReader(tmpFile)) {
+                reader.open();
+
+                NcDataBlock block;
+                while ((block = reader.read()) != null) {
+                    blocks.add(block);
                 }
             }
 
-            assertEquals(11, segmentsCount);
-            assertEquals(payloadsNum, payloadsCount);
-
+            assertEquals(4, blocks.size());
+            BlocksStats stats = assertBlocks(blocks, data, options);
+            assertEquals(2, stats.segmentsCount);
+            assertEquals(1, stats.payloadsCount);
         } finally {
             tmpFile.delete();
         }
+    }
+
+    private static class BlocksStats {
+
+        public int segmentsCount = 0;
+
+        public int payloadsCount = 0;
 
     }
 }