Mercurial > stress-tester
changeset 739:3fa7d471145f
Nc file - options support
author | Devel 2 |
---|---|
date | Wed, 06 Dec 2017 09:54:50 +0100 |
parents | 8e37b4b2d695 |
children | bd686b6618b1 |
files | stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlock.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionInfoBlock.java stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionStatusBlock.java stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java |
diffstat | 5 files changed, 153 insertions(+), 47 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlock.java Tue Dec 05 15:24:49 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlock.java Wed Dec 06 09:54:50 2017 +0100 @@ -10,4 +10,5 @@ public byte type(); + public long totalSize(); }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Tue Dec 05 15:24:49 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Wed Dec 06 09:54:50 2017 +0100 @@ -199,7 +199,6 @@ return sessionId; } - private void writeHeader() throws IOException { buffer.put(NcHeader.PREAMBULE); buffer.put(NcDataHelper.VERSION_MAJOR); @@ -218,7 +217,7 @@ writeBuffer(); currentSegmentsPayloadBlocks = 0; - currentSegmentSessionInfoBlocks = 0; + currentSegmentSessionInfoBlocks = 0; } private void closeSegmentBlock() throws IOException { @@ -325,7 +324,7 @@ ncDataHelper.writeIpAddress(buffer, sessionInfo.getDstIp()); buffer.putShort((short) sessionInfo.getDstPort()); closeBlock(NcSessionInfoBlock.TYPE, false); - + int size = buffer.position() - startPos; buffer.putInt(sizePosition, size); currentSegmentSessionInfoBlocks++; @@ -384,7 +383,7 @@ buffer.putShort((short) proto); writeBuffer(); updateSegmentInfo(payloadBlockTotalSize); - + currentBlockStage = BLOCK_STAGE_CONTENT_WRITE; } @@ -476,6 +475,10 @@ } public void writeSessionPayload(long timestamp, SessionInfo session, int proto, Object data) throws IOException { + writeSessionPayload(timestamp, session, proto, data, null); + } + + public void writeSessionPayload(long timestamp, SessionInfo session, int proto, Object data, List<Option> options) throws IOException { checkOpened(); ByteBuffer dataBuffer = wrapData(data); @@ -484,6 +487,11 @@ ch.write(dataBuffer); updateSessionPayloadSize(dataSize, true); updateSegmentInfo(dataSize); + + if (options != null) { + writeSessionPayloadOptions(options); + } + closeBlock(NcSessionPayloadBlock.TYPE); }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionInfoBlock.java Tue Dec 05 15:24:49 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionInfoBlock.java Wed Dec 06 09:54:50 2017 +0100 @@ -32,7 +32,8 @@ return TYPE; } - public int totalSize() { + @Override + public long totalSize() { return totalSize; }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionStatusBlock.java Tue Dec 05 15:24:49 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionStatusBlock.java Wed Dec 06 09:54:50 2017 +0100 @@ -7,32 +7,37 @@ * @author Mirosław Hawrot */ public class NcSessionStatusBlock extends NcSessionBlock { - + public static final byte TYPE = 4; - + public static final int HEADER_SIZE = NcSessionBlock.HEADER_SIZE + 2; - + private byte status; - + public NcSessionStatusBlock() { } - + public NcSessionStatusBlock(long timestamp, int sessionId, SessionInfo sessionInfo, byte status) { super(timestamp, sessionId, sessionInfo); this.status = status; } - + @Override public byte type() { return TYPE; } - + + @Override + public long totalSize() { + return HEADER_SIZE; + } + public byte status() { return status; } - + public void status(byte status) { this.status = status; } - + }
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java Tue Dec 05 15:24:49 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java Wed Dec 06 09:54:50 2017 +0100 @@ -1,15 +1,17 @@ package com.passus.st.reader.nc; +import com.passus.data.ByteBuff; import com.passus.st.emitter.SessionInfo; +import com.passus.st.reader.nc.option.Option; import java.io.File; import java.io.IOException; -import java.nio.file.Files; import java.util.ArrayList; import java.util.List; import java.util.UUID; import org.apache.commons.io.FileUtils; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; +import static org.testng.AssertJUnit.fail; import org.testng.annotations.Test; /** @@ -20,6 +22,12 @@ private final NcDataHelper dataHelper = NcDataHelper.getInstance(); + private final SessionInfo session; + + public NcDataBlockWriterTest() throws Exception { + session = new SessionInfo("1.1.1.1:5000", "2.2.2.2:80"); + } + private File createTmpFile() throws IOException { File tmpDir = new File(System.getProperty("java.io.tmpdir"), "st"); if (!tmpDir.exists()) { @@ -29,13 +37,84 @@ return new File(tmpDir, "st_" + UUID.randomUUID().toString() + ".tmp"); } + private BlocksStats assertBlocks(List<NcDataBlock> blocks, byte[] payloadData, List<Option> payloadOptions) { + BlocksStats stats = new BlocksStats(); + NcSegmentBlock lastSegment = null; + int currentSegmentBlocksNum = 0; + long currentSegmentTotalSize = 0; + int currentSessionId = -1; + int segmentSessionId = -1; + for (int i = 0; i < blocks.size(); i++) { + boolean lastBlock = (i == (blocks.size() - 1)); + NcDataBlock block = blocks.get(i); + if (block.type() == NcSegmentBlock.TYPE) { + NcSegmentBlock segmentBlock = (NcSegmentBlock) block; + stats.segmentsCount++; + if (lastSegment != null) { + assertEquals(lastSegment.totalSize(), segmentBlock.prevTotalSize()); + assertEquals(lastSegment.totalSize(), currentSegmentTotalSize + NcSegmentBlock.HEADER_SIZE); + assertEquals(lastSegment.blocksNum(), currentSegmentBlocksNum); + } + + if (lastBlock) { + assertEquals(0, segmentBlock.blocksNum()); + } + + lastSegment = segmentBlock; + currentSegmentBlocksNum = 0; + currentSegmentTotalSize = 0; + segmentSessionId = 0; + } else if (block.type() == NcSessionPayloadBlock.TYPE) { + NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) block; + assertEquals(session, payloadBlock.sessionInfo()); + byte[] data = ((ByteBuff) payloadBlock.data()).toArray(); + assertEquals(payloadData, data); + assertEquals(NcSessionPayloadBlock.HEADER_SIZE + data.length, payloadBlock.totalSize() - payloadBlock.optionsSize()); + assertEquals(data.length, payloadBlock.dataSize()); + + List<Option> options = payloadBlock.options(); + if (payloadOptions == null) { + assertTrue(options == null); + } else { + assertTrue(options != null); + assertEquals(payloadOptions.size(), options.size()); + for (int j = 0; j < payloadOptions.size(); j++) { + Option opt1 = payloadOptions.get(j); + Option opt2 = options.get(j); + assertEquals(opt1.getName(), opt2.getName()); + assertEquals(opt1.getValue(), opt2.getValue()); + } + } + + stats.payloadsCount++; + currentSegmentBlocksNum++; + currentSegmentTotalSize += payloadBlock.totalSize(); + } else if (block.type() == NcSessionInfoBlock.TYPE) { + segmentSessionId++; + NcSessionInfoBlock sessionInfoBlock = (NcSessionInfoBlock) block; + assertEquals(session, sessionInfoBlock.sessionInfo()); + assertEquals(segmentSessionId, sessionInfoBlock.sessionId()); + currentSegmentBlocksNum++; + currentSegmentTotalSize += sessionInfoBlock.totalSize(); + } else if (block.type() == NcSessionStatusBlock.TYPE) { + NcSessionStatusBlock sessionStatusBlocks = (NcSessionStatusBlock) block; + assertEquals(session, sessionStatusBlocks.sessionInfo()); + currentSegmentBlocksNum++; + currentSegmentTotalSize += sessionStatusBlocks.totalSize(); + } else { + fail("Unknown block type '" + block.type() + "'."); + } + } + + return stats; + } + @Test - public void testWrite_WritePayload() throws Exception { + public void testWrite_PayloadNoOption() throws Exception { File tmpFile = createTmpFile(); int dataProto = 100; long time = Long.MAX_VALUE; - SessionInfo session = new SessionInfo("1.1.1.1:5000", "2.2.2.2:80"); session.setSourceName("test"); byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9}; try { @@ -52,9 +131,8 @@ assertEquals(NcDataHelper.VERSION_MINOR, content[offset++]); offset += 8; - long startOffset = offset; long currentSegmentSize = content.length - - NcHeader.SIZE + - NcHeader.SIZE - NcSegmentBlock.HEADER_SIZE; //SegmentBlock @@ -65,7 +143,7 @@ offset += 8; int segmentBlocks = dataHelper.getInt2(content, offset); offset += 2; - + long lastSegmentSize = segmentSize; assertEquals(currentSegmentSize, segmentSize); assertEquals(0, prevSegmentSize); @@ -93,7 +171,7 @@ assertEquals(data.length, dataSize); offset += dataSize; - + //SegmentBlock assertEquals(NcSegmentBlock.TYPE, content[offset++]); segmentSize = dataHelper.getLong8(content, offset); @@ -112,12 +190,11 @@ } } - @Test(enabled = false) - public void testWrite_WriteManyPayloads() throws Exception { + @Test + public void testWrite_ManyPayloadsNoOption() throws Exception { File tmpFile = createTmpFile(); long time = Long.MAX_VALUE; - SessionInfo session = new SessionInfo("1.1.1.1:5000", "2.2.2.2:80"); byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9}; int payloadsNum = 100; int maxSegmentBlocks = 10; @@ -141,37 +218,51 @@ } } - long segmentsCount = 0; - long payloadsCount = 0; - NcSegmentBlock lastSegment = null; - for (int i = 0; i < blocks.size(); i++) { - boolean lastBlock = (i == (blocks.size() - 1)); - NcDataBlock block = blocks.get(i); - if (block.type() == NcSegmentBlock.TYPE) { - NcSegmentBlock segmentBlock = (NcSegmentBlock) block; - segmentsCount++; - if (lastSegment != null) { - assertEquals(lastSegment.totalSize(), segmentBlock.prevTotalSize()); - } + BlocksStats stats = assertBlocks(blocks, data, null); + assertEquals(11, stats.segmentsCount); + assertEquals(payloadsNum, stats.payloadsCount); + } finally { + tmpFile.delete(); + } + } - if (lastBlock) { - assertEquals(0, segmentBlock.blocksNum()); - } else { - assertEquals(maxSegmentBlocks + 1, segmentBlock.blocksNum()); - } + @Test + public void testWrite_PayloadWithOptions() throws Exception { + File tmpFile = createTmpFile(); + try { + byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9}; + long time = Long.MAX_VALUE; + List<Option> options = new ArrayList<>(); + options.add(new Option("test", 1)); + try (NcDataBlockWriter writer = new NcDataBlockWriter(tmpFile)) { + writer.open(); + writer.writeSessionPayload(time, session, (byte) 100, data, options); + } - lastSegment = segmentBlock; - } else if (block.type() == NcSessionPayloadBlock.TYPE) { - payloadsCount++; + List<NcDataBlock> blocks = new ArrayList<>(); + try (NcDataBlockReader reader = new NcDataBlockReader(tmpFile)) { + reader.open(); + + NcDataBlock block; + while ((block = reader.read()) != null) { + blocks.add(block); } } - assertEquals(11, segmentsCount); - assertEquals(payloadsNum, payloadsCount); - + assertEquals(4, blocks.size()); + BlocksStats stats = assertBlocks(blocks, data, options); + assertEquals(2, stats.segmentsCount); + assertEquals(1, stats.payloadsCount); } finally { tmpFile.delete(); } + } + + private static class BlocksStats { + + public int segmentsCount = 0; + + public int payloadsCount = 0; } }