changeset 1220:6f6f6c8c790e

NcDataBlockReader, NcDataBlockWriter - protocolId moved to sessionInfoBlock
author Devel 2
date Thu, 25 Jun 2020 09:43:58 +0200
parents b1c1adbf6538
children 5d7393e2cf94
files stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionPayloadBlock.java stress-tester/src/main/java/com/passus/st/source/NcPayloadWriter.java stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java
diffstat 7 files changed, 24 insertions(+), 43 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Wed Jun 24 15:50:00 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Thu Jun 25 09:43:58 2020 +0200
@@ -193,9 +193,6 @@
                 }
             }
         }
-
-        System.out.println("SynchFlowWorker.sessions: " + sessions.size());
-        System.out.println("SynchFlowWorker.eventsQueue: " + eventsQueue.size());
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java	Wed Jun 24 15:50:00 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java	Thu Jun 25 09:43:58 2020 +0200
@@ -268,12 +268,14 @@
             throw new IOException("Not supported session transport '" + transport + "'.");
         }
 
+        byte protocolId = buffer.read();
+
         IpAddress clientIp = ncDataHelper.readIpAddress(buffer);
         int clientPort = ncDataHelper.readInt2(buffer);
         IpAddress serverIp = ncDataHelper.readIpAddress(buffer);
         int serverPort = ncDataHelper.readInt2(buffer);
 
-        SessionInfo sessionInfo = new SessionInfo(clientIp, clientPort, serverIp, serverPort, transport);
+        SessionInfo sessionInfo = new SessionInfo(clientIp, clientPort, serverIp, serverPort, transport, protocolId);
         sessionInfo.setSourceName(sourceName);
         if (sessionIdMap.put(sessionId, sessionInfo) != null) {
             throw new IOException("Multiple session info block for sessionId '" + sessionId + "'.");
@@ -318,9 +320,8 @@
 
         long totalSize = ncDataHelper.readLong4(buffer);
         long dataSize = ncDataHelper.readLong4(buffer);
-        int proto = ncDataHelper.readInt2(buffer);
 
-        return new NcSessionPayloadBlock(timestamp, sessionId, session, totalSize, dataSize, proto, null, null);
+        return new NcSessionPayloadBlock(timestamp, sessionId, session, totalSize, dataSize, null, null);
     }
 
     private int readSessionPayloadBlockContent(ByteBuff data, int length) throws IOException {
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Wed Jun 24 15:50:00 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Thu Jun 25 09:43:58 2020 +0200
@@ -319,6 +319,7 @@
         buffer.putInt(sessionId);
         ncDataHelper.writeStringNullTerminated(buffer, sessionInfo.getSourceName());
         buffer.put((byte) sessionInfo.getTransport());
+        buffer.put((byte) sessionInfo.getProtocolId());
         ncDataHelper.writeIpAddress(buffer, sessionInfo.getSrcIp());
         buffer.putShort((short) sessionInfo.getSrcPort());
         ncDataHelper.writeIpAddress(buffer, sessionInfo.getDstIp());
@@ -366,7 +367,7 @@
         writeBuffer(payloadBlockSizePos);
     }
 
-    public void writeSessionPayloadHeader(long timestamp, SessionInfo session, int proto) throws IOException {
+    public void writeSessionPayloadHeader(long timestamp, SessionInfo session) throws IOException {
         checkOpened();
 
         int sessionId = getSessionId(session, true);
@@ -380,7 +381,6 @@
         payloadBlockSizePos = ch.position() + NcSessionBlock.HEADER_SIZE;
         ncDataHelper.writeLong4(buffer, payloadBlockTotalSize);
         ncDataHelper.writeLong4(buffer, 0);
-        buffer.putShort((short) proto);
         writeBuffer();
         updateSegmentInfo(payloadBlockTotalSize);
 
@@ -474,16 +474,16 @@
         closeBlock(NcSessionPayloadBlock.TYPE);
     }
 
-    public void writeSessionPayload(long timestamp, SessionInfo session, int proto, Object data) throws IOException {
-        writeSessionPayload(timestamp, session, proto, data, null);
+    public void writeSessionPayload(long timestamp, SessionInfo session, Object data) throws IOException {
+        writeSessionPayload(timestamp, session, data, null);
     }
 
-    public void writeSessionPayload(long timestamp, SessionInfo session, int proto, Object data, List<Option> options) throws IOException {
+    public void writeSessionPayload(long timestamp, SessionInfo session, Object data, List<Option> options) throws IOException {
         checkOpened();
 
         ByteBuffer dataBuffer = wrapData(data);
         int dataSize = dataBuffer.remaining();
-        writeSessionPayloadHeader(timestamp, session, proto);
+        writeSessionPayloadHeader(timestamp, session);
         ch.write(dataBuffer);
         updateSessionPayloadSize(dataSize, true);
         updateSegmentInfo(dataSize);
@@ -509,7 +509,7 @@
                 break;
             case NcSessionPayloadBlock.TYPE:
                 NcSessionPayloadBlock sessionPayload = (NcSessionPayloadBlock) block;
-                writeSessionPayload(sessionPayload.timestamp(), sessionPayload.sessionInfo(), sessionPayload.proto(), sessionPayload.data());
+                writeSessionPayload(sessionPayload.timestamp(), sessionPayload.sessionInfo(), sessionPayload.data());
                 break;
             default:
                 throw new IllegalArgumentException("Not supported block '" + block.type() + "'.");
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionPayloadBlock.java	Wed Jun 24 15:50:00 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionPayloadBlock.java	Thu Jun 25 09:43:58 2020 +0200
@@ -3,7 +3,6 @@
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.reader.nc.option.Option;
 import java.util.List;
-import java.util.Map;
 
 /**
  *
@@ -13,7 +12,7 @@
 
     public static final byte TYPE = 3;
 
-    public static final int HEADER_SIZE = NcSessionBlock.HEADER_SIZE + 10;
+    public static final int HEADER_SIZE = NcSessionBlock.HEADER_SIZE + 8;
 
     private long totalSize;
 
@@ -21,8 +20,6 @@
 
     private int optionsSize;
 
-    private int proto;
-
     private Object data;
 
     private List<Option> options;
@@ -31,13 +28,11 @@
     }
 
     public NcSessionPayloadBlock(long timestamp, int sessionId, SessionInfo sessionInfo,
-            long totalSize, long dataSize, int proto,
-            Object data, List<Option> options) {
+            long totalSize, long dataSize, Object data, List<Option> options) {
         super(timestamp, sessionId, sessionInfo);
         this.totalSize = totalSize;
         this.dataSize = dataSize;
         this.optionsSize = (int) (totalSize - HEADER_SIZE - dataSize);
-        this.proto = proto;
         this.options = options;
         this.data = data;
     }
@@ -55,14 +50,6 @@
         this.totalSize = totalSize;
     }
 
-    public int proto() {
-        return proto;
-    }
-
-    public void proto(int proto) {
-        this.proto = proto;
-    }
-
     public long dataSize() {
         return dataSize;
     }
--- a/stress-tester/src/main/java/com/passus/st/source/NcPayloadWriter.java	Wed Jun 24 15:50:00 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/NcPayloadWriter.java	Thu Jun 25 09:43:58 2020 +0200
@@ -58,7 +58,7 @@
                 writeResponse(resp, respBuffer);
             }
 
-            writer.writeSessionPayloadHeader(timestamp, session, (byte) session.getProtocolId());
+            writer.writeSessionPayloadHeader(timestamp, session);
             writer.writeSessionPayloadData(new byte[]{flags});
             writer.writeSessionPayloadData(reqBuffer);
             writer.writeSessionPayloadData(respBuffer);
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java	Wed Jun 24 15:50:00 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java	Thu Jun 25 09:43:58 2020 +0200
@@ -41,7 +41,7 @@
         long time = Long.MAX_VALUE;
         try (NcDataBlockWriter writer = new NcDataBlockWriter(tmpFile)) {
             writer.open();
-            writer.writeSessionPayload(time, session, (byte) 100, data);
+            writer.writeSessionPayload(time, session, data);
         }
 
         return tmpFile;
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java	Wed Jun 24 15:50:00 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java	Thu Jun 25 09:43:58 2020 +0200
@@ -3,19 +3,18 @@
 import com.passus.data.ByteBuff;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.reader.nc.option.Option;
+import org.apache.commons.io.FileUtils;
+import org.testng.annotations.Test;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import org.apache.commons.io.FileUtils;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertTrue;
-import static org.testng.AssertJUnit.fail;
-import org.testng.annotations.Test;
+
+import static org.testng.AssertJUnit.*;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public class NcDataBlockWriterTest {
@@ -120,7 +119,7 @@
         try {
             try (NcDataBlockWriter writer = new NcDataBlockWriter(tmpFile)) {
                 writer.open();
-                writer.writeSessionPayload(time, session, (byte) dataProto, data);
+                writer.writeSessionPayload(time, session, data);
             }
 
             int offset = 0;
@@ -152,7 +151,7 @@
             //SessionInfoBlock
             assertEquals(NcSessionInfoBlock.TYPE, content[offset]);
             long totalSize = dataHelper.getInt4(content, offset + 1);
-            assertEquals(29, totalSize);
+            assertEquals(30, totalSize);
             offset += totalSize;
 
             //SessionPayloadBlock
@@ -163,10 +162,7 @@
             offset += 4;
             long dataSize = dataHelper.getLong4(content, offset);
             offset += 4;
-            int proto = dataHelper.getInt2(content, offset);
-            offset += 2;
 
-            assertEquals(dataProto, proto);
             assertEquals(NcSessionPayloadBlock.HEADER_SIZE + data.length, totalSize);
             assertEquals(data.length, dataSize);
 
@@ -204,7 +200,7 @@
                 writer.setMaxSegmentBlocks(maxSegmentBlocks);
                 int countDown = payloadsNum;
                 do {
-                    writer.writeSessionPayload(time, session, (byte) 100, data);
+                    writer.writeSessionPayload(time, session, data);
                 } while (--countDown > 0);
             }
 
@@ -236,7 +232,7 @@
             options.add(new Option("test", 1));
             try (NcDataBlockWriter writer = new NcDataBlockWriter(tmpFile)) {
                 writer.open();
-                writer.writeSessionPayload(time, session, (byte) 100, data, options);
+                writer.writeSessionPayload(time, session, data, options);
             }
 
             List<NcDataBlock> blocks = new ArrayList<>();