Mercurial > stress-tester
changeset 738:8e37b4b2d695
Nc file - options support in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java Tue Dec 05 11:08:43 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java Tue Dec 05 15:24:49 2017 +0100 @@ -1,7 +1,6 @@ package com.passus.st.reader.nc; import com.passus.data.ByteBuff; -import com.passus.data.DataHelper; import com.passus.data.DataSource; import com.passus.data.HeapByteBuff; import com.passus.net.http.HttpConsts; @@ -17,7 +16,6 @@ import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_REQUEST; import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_RESPONSE; import java.io.IOException; -import java.nio.ByteOrder; /** * @@ -169,7 +167,7 @@ size += ncDataHelper.writeLongVLC(buffer, content.available()); } - writer.writeSessionPayloadHeader(timestamp, session, (byte) 1, size); + writer.writeSessionPayloadHeader(timestamp, session, (byte) 1); writer.writeSessionPayloadData(buffer); if (content != null) { writer.writeSessionPayloadData(content); @@ -211,7 +209,7 @@ } } - writer.writeSessionPayloadHeader(timestamp, session, (byte) 1, size); + writer.writeSessionPayloadHeader(timestamp, session, (byte) 1); writer.writeSessionPayloadData(new byte[]{flags}); writer.writeSessionPayloadData(reqBuffer); if (reqContent != null) {
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java Tue Dec 05 11:08:43 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java Tue Dec 05 15:24:49 2017 +0100 @@ -7,6 +7,10 @@ import com.passus.net.session.SessionBean; import com.passus.st.emitter.SessionInfo; import com.passus.st.reader.DataBlockReader; +import com.passus.st.reader.nc.option.DefaultValueCoder; +import com.passus.st.reader.nc.option.Option; +import com.passus.st.reader.nc.option.Options; +import com.passus.st.reader.nc.option.ValueCoder; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -15,7 +19,9 @@ import java.nio.file.Paths; import java.nio.file.Files; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -322,9 +328,32 @@ } } - Map<String, Object> options = null; - if (!skipOptions && payloadBlock.optionsSize() != 0) { - throw new RuntimeException("optionsLength != 0 - not implemented yet"); + if (!skipOptions && payloadBlock.optionsSize() > 0) { + int optionsSize = payloadBlock.optionsSize(); + if (optionsSize > 0) { + List<Option> options = new ArrayList<>(); + read(optionsSize); + int startIndex = buffer.startIndex(); + do { + byte code = buffer.read(); + String name; + if (code == 0) { + name = ncDataHelper.readStringNullTerminated(buffer); + } else { + name = Options.getOptionName(code); + } + + ValueCoder coder = Options.getValueCoder(name); + if (coder == null) { + coder = DefaultValueCoder.INSTANCE; + } + + Object value = coder.decode(buffer); + options.add(new Option(name, value)); + } while (buffer.startIndex() - startIndex < optionsSize); + payloadBlock.options(options); + } + } return payloadBlock;
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Tue Dec 05 11:08:43 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Tue Dec 05 15:24:49 2017 +0100 @@ -4,8 +4,13 @@ import com.passus.data.ByteBuff; import com.passus.data.ByteBuffDataSource; import com.passus.data.DataSource; +import com.passus.data.HeapByteBuff; import com.passus.st.emitter.SessionInfo; import com.passus.st.reader.DataBlockWriter; +import com.passus.st.reader.nc.option.DefaultValueCoder; +import com.passus.st.reader.nc.option.Option; +import com.passus.st.reader.nc.option.Options; +import com.passus.st.reader.nc.option.ValueCoder; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -14,6 +19,7 @@ import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -50,6 +56,12 @@ private int maxSegmentBlocks = DEFAULT_MAX_SEGMENT_BLOCKS; + private long payloadBlockSizePos = -1; + + private long payloadBlockTotalSize = 0; + + private long payloadBlockDataSize = 0; + private long prevSegmentSize = 0; private long currentSegmentInfoPos = -1; @@ -187,6 +199,7 @@ return sessionId; } + private void writeHeader() throws IOException { buffer.put(NcHeader.PREAMBULE); buffer.put(NcDataHelper.VERSION_MAJOR); @@ -196,16 +209,16 @@ } private void openSegmentBlock() throws IOException { + currentSegmentSize = NcSegmentBlock.HEADER_SIZE; currentSegmentInfoPos = ch.position() + 1; buffer.put(NcSegmentBlock.TYPE); - buffer.putLong(0L); + buffer.putLong(currentSegmentSize); buffer.putLong(prevSegmentSize); buffer.putShort((short) 0); writeBuffer(); currentSegmentsPayloadBlocks = 0; - currentSegmentSessionInfoBlocks = 0; - currentSegmentSize = 0; + currentSegmentSessionInfoBlocks = 0; } private void closeSegmentBlock() throws IOException { @@ -270,9 +283,13 @@ this.currentBlockType = -1; this.currentBlockStage = -1; + this.payloadBlockSizePos = -1; + this.payloadBlockTotalSize = 0; + this.payloadBlockDataSize = 0; if (incrementSegmentBlocks) { incrementSegmentBlocks(); } + } private void incrementSegmentBlocks() throws IOException { @@ -295,21 +312,24 @@ } openBlock(NcSessionInfoBlock.TYPE); - int size = 10; + int startPos = buffer.position(); buffer.put(NcSessionInfoBlock.TYPE); int sizePosition = buffer.position(); buffer.putInt(0); buffer.putInt(sessionId); - size += ncDataHelper.writeStringNullTerminated(buffer, sessionInfo.getSourceName()); + ncDataHelper.writeStringNullTerminated(buffer, sessionInfo.getSourceName()); buffer.put((byte) sessionInfo.getTransport()); - size += ncDataHelper.writeIpAddress(buffer, sessionInfo.getSrcIp()); + ncDataHelper.writeIpAddress(buffer, sessionInfo.getSrcIp()); buffer.putShort((short) sessionInfo.getSrcPort()); - size += ncDataHelper.writeIpAddress(buffer, sessionInfo.getDstIp()); + ncDataHelper.writeIpAddress(buffer, sessionInfo.getDstIp()); buffer.putShort((short) sessionInfo.getDstPort()); closeBlock(NcSessionInfoBlock.TYPE, false); + + int size = buffer.position() - startPos; buffer.putInt(sizePosition, size); currentSegmentSessionInfoBlocks++; + writeBuffer(); updateSegmentInfo(size); return sessionId; @@ -331,7 +351,23 @@ updateSegmentInfo(NcSessionStatusBlock.HEADER_SIZE); } - public void writeSessionPayloadHeader(long timestamp, SessionInfo session, int proto, long dataSize) throws IOException { + private void updateSessionPayloadSize(long size, boolean isDataSize) throws IOException { + if (payloadBlockSizePos == -1) { + throw new IllegalStateException("Unable to update block size (payloadBlockSizePos == -1)."); + } + + payloadBlockTotalSize += size; + if (isDataSize) { + payloadBlockDataSize += size; + } + + buffer.clear(); + ncDataHelper.writeLong4(buffer, payloadBlockTotalSize); + ncDataHelper.writeLong4(buffer, payloadBlockDataSize); + writeBuffer(payloadBlockSizePos); + } + + public void writeSessionPayloadHeader(long timestamp, SessionInfo session, int proto) throws IOException { checkOpened(); int sessionId = getSessionId(session, true); @@ -340,12 +376,15 @@ buffer.putLong(timestamp); buffer.putInt(sessionId); - ncDataHelper.writeLong4(buffer, NcSessionPayloadBlock.HEADER_SIZE + dataSize); - ncDataHelper.writeLong4(buffer, dataSize); + payloadBlockTotalSize = NcSessionPayloadBlock.HEADER_SIZE; + payloadBlockDataSize = 0; + payloadBlockSizePos = ch.position() + NcSessionBlock.HEADER_SIZE; + ncDataHelper.writeLong4(buffer, payloadBlockTotalSize); + ncDataHelper.writeLong4(buffer, 0); buffer.putShort((short) proto); - - updateSegmentInfo(NcSessionPayloadBlock.HEADER_SIZE); writeBuffer(); + updateSegmentInfo(payloadBlockTotalSize); + currentBlockStage = BLOCK_STAGE_CONTENT_WRITE; } @@ -388,9 +427,42 @@ ByteBuffer dataBuffer = wrapData(data); int dataSize = dataBuffer.remaining(); ch.write(dataBuffer); + updateSessionPayloadSize(dataSize, true); updateSegmentInfo(dataSize); } + public void writeSessionPayloadOptions(List<Option> options) throws IOException { + checkOpened(); + + if (currentBlockType != NcSessionPayloadBlock.TYPE) { + throw new IllegalStateException("Unable to close block '" + NcSessionPayloadBlock.TYPE + ". Block is not opened."); + } else if (currentBlockStage != BLOCK_STAGE_CONTENT_WRITE) { + throw new IllegalStateException("Unable to close block '" + NcSessionPayloadBlock.TYPE + ". Header is not written."); + } + + ByteBuff optionsBuffer = new HeapByteBuff(256); + for (Option option : options) { + String name = option.getName(); + byte code = Options.getOptionCode(name); + ValueCoder coder = Options.getValueCoder(name); + if (coder == null) { + coder = DefaultValueCoder.INSTANCE; + } + + optionsBuffer.append(code); + if (code == 0) { + ncDataHelper.writeStringNullTerminated(optionsBuffer, name); + } + + coder.encode(option.getValue(), optionsBuffer); + } + + ch.write(optionsBuffer.toNioByteBuffer(false)); + int optionsSize = optionsBuffer.readableBytes(); + updateSessionPayloadSize(optionsSize, false); + updateSegmentInfo(optionsSize); + } + public void closeSessionPayloadBlock() throws IOException { checkOpened(); @@ -408,9 +480,9 @@ ByteBuffer dataBuffer = wrapData(data); int dataSize = dataBuffer.remaining(); - writeSessionPayloadHeader(timestamp, session, proto, dataSize); + writeSessionPayloadHeader(timestamp, session, proto); ch.write(dataBuffer); - + updateSessionPayloadSize(dataSize, true); updateSegmentInfo(dataSize); closeBlock(NcSessionPayloadBlock.TYPE); }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcSegmentBlock.java Tue Dec 05 11:08:43 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcSegmentBlock.java Tue Dec 05 15:24:49 2017 +0100 @@ -8,7 +8,7 @@ public static final byte TYPE = 1; - public static final int HEADER_SIZE = 10; + public static final int HEADER_SIZE = 18 + NcDataBlock.HEADER_SIZE; private long totalSize;
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionInfoBlock.java Tue Dec 05 11:08:43 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionInfoBlock.java Tue Dec 05 15:24:49 2017 +0100 @@ -1,6 +1,5 @@ package com.passus.st.reader.nc; -import com.passus.net.IpAddress; import com.passus.st.emitter.SessionInfo; /**
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionPayloadBlock.java Tue Dec 05 11:08:43 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionPayloadBlock.java Tue Dec 05 15:24:49 2017 +0100 @@ -1,6 +1,8 @@ package com.passus.st.reader.nc; import com.passus.st.emitter.SessionInfo; +import com.passus.st.reader.nc.option.Option; +import java.util.List; import java.util.Map; /** @@ -11,30 +13,30 @@ public static final byte TYPE = 3; - public static final int HEADER_SIZE = NcSessionBlock.HEADER_SIZE + 14; + public static final int HEADER_SIZE = NcSessionBlock.HEADER_SIZE + 10; private long totalSize; private long dataSize; - private long optionsSize; + private int optionsSize; private int proto; private Object data; - private Map<String, Object> options; + private List<Option> options; public NcSessionPayloadBlock() { } public NcSessionPayloadBlock(long timestamp, int sessionId, SessionInfo sessionInfo, long totalSize, long dataSize, int proto, - Object data, Map<String, Object> options) { + Object data, List<Option> options) { super(timestamp, sessionId, sessionInfo); this.totalSize = totalSize; this.dataSize = dataSize; - this.optionsSize = totalSize - HEADER_SIZE - dataSize; + this.optionsSize = (int) (totalSize - HEADER_SIZE - dataSize); this.proto = proto; this.options = options; this.data = data; @@ -69,7 +71,7 @@ this.dataSize = dataLength; } - public long optionsSize() { + public int optionsSize() { return optionsSize; } @@ -81,11 +83,11 @@ this.data = data; } - public Map<String, Object> options() { + public List<Option> options() { return options; } - public void options(Map<String, Object> options) { + public void options(List<Option> options) { this.options = options; }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/option/OptionsReaderWriter.java Tue Dec 05 11:08:43 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/option/OptionsReaderWriter.java Tue Dec 05 15:24:49 2017 +0100 @@ -11,10 +11,10 @@ */ public class OptionsReaderWriter { - public static NcDataHelper NC_HELPER = NcDataHelper.getInstance(); + private final NcDataHelper ncDataHelper = NcDataHelper.getInstance(); public void writeOptions(List<Option> options, ByteBuff buff) { - NC_HELPER.writeIntVLC(buff, options.size()); + ncDataHelper.writeIntVLC(buff, options.size()); for (Option option : options) { String name = option.getName(); @@ -26,21 +26,21 @@ buff.append(code); if (code == 0) { - NC_HELPER.writeStringNullTerminated(buff, name); + ncDataHelper.writeStringNullTerminated(buff, name); } coder.encode(option.getValue(), buff); } } public List<Option> readOptions(ByteBuff buff) { - int numOptions = NC_HELPER.readIntVLC(buff); + int numOptions = ncDataHelper.readIntVLC(buff); List<Option> options = new ArrayList<>(numOptions); for (int i = 0; i < numOptions; i++) { byte code = buff.read(); String name; if (code == 0) { - name = NC_HELPER.readStringNullTerminated(buff); + name = ncDataHelper.readStringNullTerminated(buff); } else { name = Options.getOptionName(code); }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/option/ValueCoder.java Tue Dec 05 11:08:43 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/option/ValueCoder.java Tue Dec 05 15:24:49 2017 +0100 @@ -1,6 +1,7 @@ package com.passus.st.reader.nc.option; import com.passus.data.ByteBuff; +import java.nio.ByteBuffer; import org.apache.commons.lang3.mutable.Mutable; /**
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java Tue Dec 05 11:08:43 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java Tue Dec 05 15:24:49 2017 +0100 @@ -80,7 +80,7 @@ assertEquals(NcSegmentBlock.TYPE, block.type()); NcSegmentBlock lastSegmentBlock = (NcSegmentBlock) block; assertEquals(0, lastSegmentBlock.blocksNum()); - assertEquals(0, lastSegmentBlock.totalSize()); + assertEquals(NcSegmentBlock.HEADER_SIZE, lastSegmentBlock.totalSize()); assertEquals(firstSegmentBlock.totalSize(), lastSegmentBlock.prevTotalSize()); } } finally {
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java Tue Dec 05 11:08:43 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java Tue Dec 05 15:24:49 2017 +0100 @@ -1,9 +1,9 @@ package com.passus.st.reader.nc; -import com.passus.data.DataHelper; import com.passus.st.emitter.SessionInfo; import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -18,7 +18,7 @@ */ public class NcDataBlockWriterTest { - private final DataHelper dataHelper = DataHelper.BIG_ENDIAN; + private final NcDataHelper dataHelper = NcDataHelper.getInstance(); private File createTmpFile() throws IOException { File tmpDir = new File(System.getProperty("java.io.tmpdir"), "st"); @@ -33,13 +33,15 @@ public void testWrite_WritePayload() throws Exception { File tmpFile = createTmpFile(); + int dataProto = 100; long time = Long.MAX_VALUE; SessionInfo session = new SessionInfo("1.1.1.1:5000", "2.2.2.2:80"); + session.setSourceName("test"); byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9}; try { try (NcDataBlockWriter writer = new NcDataBlockWriter(tmpFile)) { writer.open(); - writer.writeSessionPayload(time, session, (byte) 100, data); + writer.writeSessionPayload(time, session, (byte) dataProto, data); } int offset = 0; @@ -50,6 +52,11 @@ assertEquals(NcDataHelper.VERSION_MINOR, content[offset++]); offset += 8; + long startOffset = offset; + long currentSegmentSize = content.length + - NcHeader.SIZE + - NcSegmentBlock.HEADER_SIZE; + //SegmentBlock assertEquals(NcSegmentBlock.TYPE, content[offset++]); long segmentSize = dataHelper.getLong8(content, offset); @@ -58,17 +65,54 @@ offset += 8; int segmentBlocks = dataHelper.getInt2(content, offset); offset += 2; + + long lastSegmentSize = segmentSize; + assertEquals(currentSegmentSize, segmentSize); assertEquals(0, prevSegmentSize); assertEquals(2, segmentBlocks); //SessionInfoBlock - assertEquals(NcSessionInfoBlock.TYPE, content[offset++]); + assertEquals(NcSessionInfoBlock.TYPE, content[offset]); + long totalSize = dataHelper.getInt4(content, offset + 1); + assertEquals(29, totalSize); + offset += totalSize; + + //SessionPayloadBlock + assertEquals(NcSessionPayloadBlock.TYPE, content[offset++]); + offset += 8; //timestmp + offset += 4; //sessionId + totalSize = dataHelper.getLong4(content, offset); + offset += 4; + long dataSize = dataHelper.getLong4(content, offset); + offset += 4; + int proto = dataHelper.getInt2(content, offset); + offset += 2; + + assertEquals(dataProto, proto); + assertEquals(NcSessionPayloadBlock.HEADER_SIZE + data.length, totalSize); + assertEquals(data.length, dataSize); + + offset += dataSize; + + //SegmentBlock + assertEquals(NcSegmentBlock.TYPE, content[offset++]); + segmentSize = dataHelper.getLong8(content, offset); + offset += 8; + prevSegmentSize = dataHelper.getLong8(content, offset); + offset += 8; + segmentBlocks = dataHelper.getInt2(content, offset); + offset += 2; + + assertEquals(NcSegmentBlock.HEADER_SIZE, segmentSize); + assertEquals(lastSegmentSize, prevSegmentSize); + assertEquals(0, segmentBlocks); + assertEquals(offset, content.length); } finally { tmpFile.delete(); } } - @Test + @Test(enabled = false) public void testWrite_WriteManyPayloads() throws Exception { File tmpFile = createTmpFile(); @@ -78,7 +122,6 @@ int payloadsNum = 100; int maxSegmentBlocks = 10; try { - try (NcDataBlockWriter writer = new NcDataBlockWriter(tmpFile)) { writer.open(); writer.setMaxSegmentBlocks(maxSegmentBlocks);