Mercurial > stress-tester
changeset 677:ff5f3f3cdf60
NC file in progress
author | Devel 2 |
---|---|
date | Tue, 21 Nov 2017 14:39:48 +0100 |
parents | f1547b730293 |
children | 77d613d85107 |
files | stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSegmentBlock.java stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionStatusBlock.java stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java |
diffstat | 4 files changed, 159 insertions(+), 47 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Tue Nov 21 10:43:31 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Tue Nov 21 14:39:48 2017 +0100 @@ -31,7 +31,13 @@ private static final int DEFAULT_BYTE_BUFFER = 64 * 1024; - private NcDataHelper dataHelper = NcDataHelper.getInstance(); + private static final int BLOCK_STAGE_HEADER_WRITE = 1; + + private static final int BLOCK_STAGE_CONTENT_WRITE = 2; + + private static final int MAX_SEGMENT_BLOCKS = Short.MAX_VALUE; + + private final NcDataHelper dataHelper = NcDataHelper.getInstance(); private final Path path; @@ -41,9 +47,13 @@ private int bufferSize = DEFAULT_BYTE_BUFFER; + private int currentBlockType = -1; + + private int currentBlockStage = -1; + private int maxSegmentBlocks = DEFAULT_MAX_SEGMENT_BLOCKS; - private long currentSegmentSizePos = -1; + private long currentSegmentInfoPos = -1; private int currentSegmentsBlocks = -1; @@ -53,7 +63,7 @@ private final Map<SessionInfo, Integer> sessionIdMap = new HashMap<>(); - private ByteBuffer buffer = ByteBuffer.allocate(bufferSize); + private final ByteBuffer buffer = ByteBuffer.allocate(bufferSize); public NcDataBlockWriter(String fileName) { this.path = Paths.get(fileName); @@ -74,9 +84,21 @@ public void setMaxSegmentBlocks(int maxSegmentBlocks) { Assert.greaterThanZero(maxSegmentBlocks, "maxSegmentBlocks"); + if (maxSegmentBlocks > MAX_SEGMENT_BLOCKS) { + throw new IllegalArgumentException("maxSegmentBlocks should be less or equal than " + MAX_SEGMENT_BLOCKS + "."); + } + this.maxSegmentBlocks = maxSegmentBlocks; } + public boolean isUpdateSegmentSizeOnClose() { + return updateSegmentSizeOnClose; + } + + public void setUpdateSegmentSizeOnClose(boolean updateSegmentSizeOnClose) { + this.updateSegmentSizeOnClose = updateSegmentSizeOnClose; + } + private void write(ByteBuffer buff) throws IOException { buff.flip(); ch.write(buff); @@ -128,9 +150,9 @@ public void close() throws IOException { if (ch != null) { closeSegmentBlock(); + flush(); try { - flush(); ch.close(); } catch (Exception ignore) { } @@ -145,7 +167,7 @@ } } - private int getSessionId(SessionInfo session, boolean writeIfNeeded) { + private int getSessionId(SessionInfo session, boolean writeIfNeeded) throws IOException { Integer sessionId = sessionIdMap.get(session); if (writeIfNeeded && sessionId == null) { sessionId = writeSessionInfoBlock(session); @@ -154,43 +176,43 @@ return sessionId; } - private int writeHeader() throws IOException { - buffer.clear(); + private void writeHeader() throws IOException { buffer.put(NcHeader.PREAMBULE); buffer.put(NcDataHelper.VERSION_MAJOR); buffer.put(NcDataHelper.VERSION_MINOR); buffer.putLong(0L); writeBuffer(); - return NcHeader.SIZE; } private void openSegmentBlock() throws IOException { + currentSegmentInfoPos = ch.position() + 1; buffer.put(NcSegmentBlock.TYPE); - currentSegmentSizePos = ch.position(); buffer.putLong(0L); + buffer.putShort((short) 0); + writeBuffer(); currentSegmentsBlocks = 0; currentSegmentSize = 0; } private void closeSegmentBlock() throws IOException { - if (currentSegmentSizePos == -1) { + if (currentSegmentInfoPos == -1) { throw new IllegalStateException("Unable to update segment size. Segment not created."); } updateSegmentInfo(-1, true); - currentSegmentSizePos = -1; + currentSegmentInfoPos = -1; currentSegmentsBlocks = -1; currentSegmentSize = -1; sessionIdMap.clear(); } - private void updateSegmentSize(long size) throws IOException { + private void updateSegmentInfo(long size) throws IOException { updateSegmentInfo(size, false); } private void updateSegmentInfo(long size, boolean close) throws IOException { - if (currentSegmentSizePos == -1) { + if (currentSegmentInfoPos == -1) { throw new IllegalStateException("Unable to update segment size. Segment not created."); } @@ -198,29 +220,76 @@ currentSegmentSize += size; } - if (updateSegmentSizeOnClose && !close) { + if (close || !updateSegmentSizeOnClose) { buffer.clear(); buffer.putLong(currentSegmentSize); - writeBuffer(currentSegmentSizePos); + buffer.putShort((short) currentSegmentsBlocks); + writeBuffer(currentSegmentInfoPos); } } - private int writeSessionInfoBlock(SessionInfo sessionInfo) { - int size = sessionIdMap.size(); - int sessionId = size + 1; + private void openBlock(int blockType) throws IOException { + if (currentBlockType != -1 && currentBlockType != blockType) { + throw new IllegalStateException("Unable to open block '" + blockType + "' " + + " another block '" + currentBlockType + "' is opened."); + } + + this.currentBlockType = blockType; + this.currentBlockStage = BLOCK_STAGE_HEADER_WRITE; + } + + private void closeBlock(int blockType) throws IOException { + closeBlock(blockType, true); + } + + private void closeBlock(int blockType, boolean incrementSegmentBlocks) throws IOException { + if (currentBlockType != blockType) { + throw new IllegalStateException("Unable to close block '" + blockType + "' " + + " another block '" + currentBlockType + "' is opened."); + } + + this.currentBlockType = -1; + this.currentBlockStage = -1; + if (incrementSegmentBlocks) { + incrementSegmentBlocks(); + } + } + + private void incrementSegmentBlocks() throws IOException { + incrementSegmentBlocks(true); + } + + private void incrementSegmentBlocks(boolean closeSegmentIfNeeded) throws IOException { + currentSegmentsBlocks++; + + if (closeSegmentIfNeeded && currentSegmentsBlocks >= maxSegmentBlocks) { + closeSegmentBlock(); + } + } + + private int writeSessionInfoBlock(SessionInfo sessionInfo) throws IOException { + int sessionId = sessionIdMap.size() + 1; if (sessionIdMap.put(sessionInfo, sessionId) != null) { throw new IllegalStateException("Session block '" + sessionInfo + "' already exists."); } + openBlock(NcSessionInfoBlock.TYPE); + int size = 10; buffer.put(NcSessionInfoBlock.TYPE); buffer.putInt(sessionId); - dataHelper.writeStringNullTerminated(sessionInfo.getSourceName(), buffer); + size += dataHelper.writeStringNullTerminated(sessionInfo.getSourceName(), buffer); buffer.put((byte) sessionInfo.getTransport()); - dataHelper.writeIpAddress(sessionInfo.getSrcIp(), buffer); + size += dataHelper.writeIpAddress(sessionInfo.getSrcIp(), buffer); buffer.putShort((short) sessionInfo.getSrcPort()); - dataHelper.writeIpAddress(sessionInfo.getDstIp(), buffer); + size += dataHelper.writeIpAddress(sessionInfo.getDstIp(), buffer); buffer.putShort((short) sessionInfo.getDstPort()); + closeBlock(NcSessionInfoBlock.TYPE, false); + + incrementSegmentBlocks(false); + updateSegmentInfo(size); + + return sessionId; } @@ -228,26 +297,34 @@ checkOpened(); int sessionId = getSessionId(session, true); + + openBlock(NcSessionStatusBlock.TYPE); buffer.put(NcSessionStatusBlock.TYPE); buffer.putLong(timestamp); buffer.putInt(sessionId); buffer.put(status); writeBuffer(); + closeBlock(NcSessionStatusBlock.TYPE); + updateSegmentInfo(NcSessionStatusBlock.HEADER_SIZE); } public void writeSessionPayloadHeader(long timestamp, SessionInfo session, byte proto, long dataLength) throws IOException { checkOpened(); int sessionId = getSessionId(session, opened); - buffer.clear(); + openBlock(NcSessionPayloadBlock.TYPE); buffer.put(NcSessionPayloadBlock.TYPE); buffer.putLong(timestamp); buffer.putInt(sessionId); buffer.put(proto); - dataHelper.writeLongVLC(dataLength, buffer); + int size = 14; + size += dataHelper.writeLongVLC(dataLength, buffer); + + updateSegmentInfo(size); writeBuffer(); + currentBlockStage = BLOCK_STAGE_CONTENT_WRITE; } private ByteBuffer wrapData(Object data) throws IOException { @@ -277,10 +354,29 @@ public void writeSessionPayloadContent(Object data) throws IOException { checkOpened(); + + if (currentBlockType != NcSessionPayloadBlock.TYPE) { + throw new IllegalStateException("Unable to close block '" + NcSessionPayloadBlock.TYPE + ". Block is not opened."); + } else if (currentBlockStage != BLOCK_STAGE_CONTENT_WRITE) { + throw new IllegalStateException("Unable to close block '" + NcSessionPayloadBlock.TYPE + ". Header is not written."); + } + ByteBuffer dataBuffer = wrapData(data); int dataSize = dataBuffer.remaining(); write(dataBuffer); - updateSegmentSize(dataSize); + updateSegmentInfo(dataSize); + } + + public void closeSessionPayloadBlock() throws Exception { + checkOpened(); + + if (currentBlockType != NcSessionPayloadBlock.TYPE) { + throw new IllegalStateException("Unable to close block '" + NcSessionPayloadBlock.TYPE + ". Invalid block type."); + } else if (currentBlockStage != BLOCK_STAGE_CONTENT_WRITE) { + throw new IllegalStateException("Unable to close block '" + NcSessionPayloadBlock.TYPE + ". Header is not written."); + } + + closeBlock(NcSessionPayloadBlock.TYPE); } public void writeSessionPayload(long timestamp, SessionInfo session, byte proto, Object data) throws IOException { @@ -290,12 +386,8 @@ int dataSize = dataBuffer.remaining(); writeSessionPayloadHeader(timestamp, session, proto, dataSize); write(dataBuffer); - updateSegmentSize(dataSize); - } - - public void closeSessionPayloadBlock() throws Exception { - checkOpened(); - + updateSegmentInfo(dataSize); + closeBlock(NcSessionPayloadBlock.TYPE); } @Override
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSegmentBlock.java Tue Nov 21 10:43:31 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSegmentBlock.java Tue Nov 21 14:39:48 2017 +0100 @@ -10,6 +10,8 @@ private long totalSize; + private int blocksNum; + @Override public byte getType() { return TYPE; @@ -23,4 +25,12 @@ this.totalSize = totalSize; } + public int blocksNum() { + return blocksNum; + } + + public void blocksNum(int blocksNum) { + this.blocksNum = blocksNum; + } + }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionStatusBlock.java Tue Nov 21 10:43:31 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionStatusBlock.java Tue Nov 21 14:39:48 2017 +0100 @@ -8,6 +8,8 @@ public static final byte TYPE = 4; + public static final int HEADER_SIZE = 14; + private byte status; @Override
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java Tue Nov 21 10:43:31 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java Tue Nov 21 14:39:48 2017 +0100 @@ -1,10 +1,14 @@ package com.passus.st.reader.nc; +import com.passus.data.DataHelper; import com.passus.st.emitter.SessionInfo; +import com.passus.st.reader.nc.block.NcSegmentBlock; +import com.passus.st.reader.nc.block.NcSessionInfoBlock; import java.io.File; import java.io.IOException; import java.util.UUID; import org.apache.commons.io.FileUtils; +import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; import org.testng.annotations.Test; @@ -14,6 +18,10 @@ */ public class NcDataBlockWriterTest { + private final DataHelper dataHelper = DataHelper.BIG_ENDIAN; + + private final NcDataHelper ncDataHelper = NcDataHelper.getInstance(); + private File createTmpFile() throws IOException { File tmpDir = new File(System.getProperty("java.io.tmpdir"), "st"); if (!tmpDir.exists()) { @@ -24,22 +32,6 @@ } @Test - public void testWrite_Header() throws Exception { - File tmpFile = createTmpFile(); - - try { - try (NcDataBlockWriter writer = new NcDataBlockWriter(tmpFile)) { - writer.open(); - } - - byte[] content = FileUtils.readFileToByteArray(tmpFile); - assertTrue(NcHeader.isPreambule(content, 0)); - } finally { - tmpFile.delete(); - } - } - - @Test public void testWrite_WritePayload() throws Exception { File tmpFile = createTmpFile(); @@ -52,8 +44,24 @@ writer.writeSessionPayload(time, session, (byte) 100, data); } + int offset = 0; byte[] content = FileUtils.readFileToByteArray(tmpFile); - assertTrue(NcHeader.isPreambule(content, 0)); + assertTrue(NcHeader.isPreambule(content, offset)); + offset += NcHeader.PREAMBULE.length; + assertEquals(NcDataHelper.VERSION_MAJOR, content[offset++]); + assertEquals(NcDataHelper.VERSION_MINOR, content[offset++]); + offset += 8; + + //SegmentBlock + assertEquals(NcSegmentBlock.TYPE, content[offset++]); + long segmentSize = dataHelper.getLong8(content, offset); + offset += 8; + int segmentBlocks = dataHelper.getInt2(content, offset); + offset += 2; + assertEquals(2, segmentBlocks); + + //SessionInfoBlock + assertEquals(NcSessionInfoBlock.TYPE, content[offset++]); } finally { tmpFile.delete(); }