changeset 674:b88d3359c1c2

NC file in progress
author Devel 2
date Mon, 20 Nov 2017 15:15:56 +0100
parents 87bb120e72d7
children 1002a35b0cdd
files stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/block/NcDataBlock.java stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSegmentBlock.java stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionBlock.java stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionInfoBlock.java stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionPayloadBlock.java stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionStatusBlock.java stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java
diffstat 8 files changed, 104 insertions(+), 23 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Mon Nov 20 14:20:20 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Mon Nov 20 15:15:56 2017 +0100
@@ -7,6 +7,10 @@
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.reader.DataBlockWriter;
 import com.passus.st.reader.nc.block.NcDataBlock;
+import com.passus.st.reader.nc.block.NcSegmentBlock;
+import com.passus.st.reader.nc.block.NcSessionInfoBlock;
+import com.passus.st.reader.nc.block.NcSessionPayloadBlock;
+import com.passus.st.reader.nc.block.NcSessionStatusBlock;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -39,10 +43,14 @@
 
     private int maxSegmentBlocks = DEFAULT_MAX_SEGMENT_BLOCKS;
 
+    private long currentSegmentSizePos = -1;
+
     private int currentSegmentsBlocks = -1;
 
     private long currentSegmentSize = -1;
 
+    private boolean updateSegmentSizeOnClose = true;
+
     private final Map<SessionInfo, Integer> sessionIdMap = new HashMap<>();
 
     private ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
@@ -75,6 +83,12 @@
         buff.clear();
     }
 
+    private void write(ByteBuffer buff, long position) throws IOException {
+        buff.flip();
+        ch.write(buff, position);
+        buff.clear();
+    }
+
     private void write(ByteBuffer[] buffers) throws IOException {
         for (ByteBuffer buff : buffers) {
             buff.flip();
@@ -89,6 +103,10 @@
         write(buffer);
     }
 
+    private void writeBuffer(long position) throws IOException {
+        write(buffer, position);
+    }
+
     public void flush() throws IOException {
         ch.force(true);
     }
@@ -98,6 +116,7 @@
         try {
             ch = FileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
             writeHeader();
+            openSegmentBlock();
             opened = true;
         } catch (IOException e) {
             close();
@@ -108,6 +127,8 @@
     @Override
     public void close() throws IOException {
         if (ch != null) {
+            closeSegmentBlock();
+
             try {
                 flush();
                 ch.close();
@@ -133,12 +154,6 @@
         return sessionId;
     }
 
-    private void resetSegmentInfo() {
-        currentSegmentsBlocks = -1;
-        currentSegmentSize = -1;
-        sessionIdMap.clear();
-    }
-
     private int writeHeader() throws IOException {
         buffer.clear();
         buffer.put(NcHeader.PREAMBULE);
@@ -149,11 +164,47 @@
         return NcHeader.SIZE;
     }
 
-    private void writeSegmentBlock(long position) throws IOException {
+    private void openSegmentBlock() throws IOException {
+        buffer.put(NcSegmentBlock.TYPE);
+        currentSegmentSizePos = ch.position();
+        buffer.putLong(0L);
+
         currentSegmentsBlocks = 0;
         currentSegmentSize = 0;
     }
 
+    private void closeSegmentBlock() throws IOException {
+        if (currentSegmentSizePos == -1) {
+            throw new IllegalStateException("Unable to update segment size. Segment not created.");
+        }
+
+        updateSegmentInfo(-1, true);
+        currentSegmentSizePos = -1;
+        currentSegmentsBlocks = -1;
+        currentSegmentSize = -1;
+        sessionIdMap.clear();
+    }
+
+    private void updateSegmentSize(long size) throws IOException {
+        updateSegmentInfo(size, false);
+    }
+
+    private void updateSegmentInfo(long size, boolean close) throws IOException {
+        if (currentSegmentSizePos == -1) {
+            throw new IllegalStateException("Unable to update segment size. Segment not created.");
+        }
+
+        if (size >= 0) {
+            currentSegmentSize += size;
+        }
+
+        if (updateSegmentSizeOnClose && !close) {
+            buffer.clear();
+            buffer.putLong(currentSegmentSize);
+            writeBuffer(currentSegmentSizePos);
+        }
+    }
+
     private int writeSessionInfoBlock(SessionInfo sessionInfo) {
         int size = sessionIdMap.size();
         int sessionId = size + 1;
@@ -162,6 +213,7 @@
             throw new IllegalStateException("Session block '" + sessionInfo + "' already exists.");
         }
 
+        buffer.put(NcSessionInfoBlock.TYPE);
         buffer.putInt(sessionId);
         dataHelper.writeStringNullTerminated(sessionInfo.getSourceName(), buffer);
         buffer.put((byte) sessionInfo.getTransport());
@@ -176,6 +228,7 @@
         checkOpened();
 
         int sessionId = getSessionId(session, true);
+        buffer.put(NcSessionStatusBlock.TYPE);
         buffer.putLong(timestamp);
         buffer.putInt(sessionId);
         buffer.put(status);
@@ -188,6 +241,7 @@
 
         int sessionId = getSessionId(session, opened);
         buffer.clear();
+        buffer.put(NcSessionPayloadBlock.TYPE);
         buffer.putLong(timestamp);
         buffer.putInt(sessionId);
         buffer.put(proto);
@@ -224,15 +278,24 @@
     public void writeSessionPayloadContent(Object data) throws IOException {
         checkOpened();
         ByteBuffer dataBuffer = wrapData(data);
+        int dataSize = dataBuffer.remaining();
         write(dataBuffer);
+        updateSegmentSize(dataSize);
     }
 
     public void writeSessionPayload(long timestamp, SessionInfo session, byte proto, Object data) throws IOException {
         checkOpened();
 
         ByteBuffer dataBuffer = wrapData(data);
-        writeSessionPayloadHeader(timestamp, session, proto, dataBuffer.remaining());
+        int dataSize = dataBuffer.remaining();
+        writeSessionPayloadHeader(timestamp, session, proto, dataSize);
         write(dataBuffer);
+        updateSegmentSize(dataSize);
+    }
+
+    public void closeSessionPayloadBlock() throws Exception {
+        checkOpened();
+
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcDataBlock.java	Mon Nov 20 14:20:20 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcDataBlock.java	Mon Nov 20 15:15:56 2017 +0100
@@ -6,6 +6,6 @@
  */
 public interface NcDataBlock {
 
-    public int getType();
+    public byte getType();
 
 }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSegmentBlock.java	Mon Nov 20 14:20:20 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSegmentBlock.java	Mon Nov 20 15:15:56 2017 +0100
@@ -6,13 +6,21 @@
  */
 public class NcSegmentBlock implements NcDataBlock {
 
-    public static final int TYPE = 1;
+    public static final byte TYPE = 1;
 
     private long totalSize;
 
     @Override
-    public int getType() {
+    public byte getType() {
         return TYPE;
     }
 
+    public long totalSize() {
+        return totalSize;
+    }
+
+    public void totalSize(long totalSize) {
+        this.totalSize = totalSize;
+    }
+
 }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionBlock.java	Mon Nov 20 14:20:20 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionBlock.java	Mon Nov 20 15:15:56 2017 +0100
@@ -4,7 +4,7 @@
  *
  * @author Mirosław Hawrot
  */
-public abstract class NcSessionBlock {
+public abstract class NcSessionBlock implements NcDataBlock {
 
     private long timestamp;
 
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionInfoBlock.java	Mon Nov 20 14:20:20 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionInfoBlock.java	Mon Nov 20 15:15:56 2017 +0100
@@ -6,7 +6,9 @@
  *
  * @author Mirosław Hawrot
  */
-public class NcSessionInfoBlock {
+public class NcSessionInfoBlock implements NcDataBlock {
+
+    public static final byte TYPE = 2;
 
     private int sessionId;
 
@@ -20,6 +22,11 @@
 
     private int serverPort;
 
+    @Override
+    public byte getType() {
+        return TYPE;
+    }
+
     public int sessionId() {
         return sessionId;
     }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionPayloadBlock.java	Mon Nov 20 14:20:20 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionPayloadBlock.java	Mon Nov 20 15:15:56 2017 +0100
@@ -6,9 +6,7 @@
  */
 public class NcSessionPayloadBlock extends NcSessionBlock {
 
-    public static final byte FLAG_ENCODED = 1;
-
-    private byte flags;
+    public static final byte TYPE = 3;
 
     private int proto;
 
@@ -18,12 +16,9 @@
 
     private int optionsLength = 0;
 
-    public byte flags() {
-        return flags;
-    }
-
-    public void flags(byte flags) {
-        this.flags = flags;
+    @Override
+    public byte getType() {
+        return TYPE;
     }
 
     public int proto() {
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionStatusBlock.java	Mon Nov 20 14:20:20 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionStatusBlock.java	Mon Nov 20 15:15:56 2017 +0100
@@ -6,8 +6,15 @@
  */
 public class NcSessionStatusBlock extends NcSessionBlock {
 
+    public static final byte TYPE = 4;
+
     private byte status;
 
+    @Override
+    public byte getType() {
+        return TYPE;
+    }
+
     public byte status() {
         return status;
     }
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java	Mon Nov 20 14:20:20 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java	Mon Nov 20 15:15:56 2017 +0100
@@ -45,10 +45,11 @@
 
         long time = Long.MAX_VALUE;
         SessionInfo session = new SessionInfo("1.1.1.1:5000", "2.2.2.2:80");
+        byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9};
         try {
             try (NcDataBlockWriter writer = new NcDataBlockWriter(tmpFile)) {
                 writer.open();
-                writer.writeSessionPayload(time, session, (byte) 100, null);
+                writer.writeSessionPayload(time, session, (byte) 100, data);
             }
 
             byte[] content = FileUtils.readFileToByteArray(tmpFile);