Mercurial > stress-tester
changeset 674:b88d3359c1c2
NC file in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Mon Nov 20 14:20:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Mon Nov 20 15:15:56 2017 +0100 @@ -7,6 +7,10 @@ import com.passus.st.emitter.SessionInfo; import com.passus.st.reader.DataBlockWriter; import com.passus.st.reader.nc.block.NcDataBlock; +import com.passus.st.reader.nc.block.NcSegmentBlock; +import com.passus.st.reader.nc.block.NcSessionInfoBlock; +import com.passus.st.reader.nc.block.NcSessionPayloadBlock; +import com.passus.st.reader.nc.block.NcSessionStatusBlock; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -39,10 +43,14 @@ private int maxSegmentBlocks = DEFAULT_MAX_SEGMENT_BLOCKS; + private long currentSegmentSizePos = -1; + private int currentSegmentsBlocks = -1; private long currentSegmentSize = -1; + private boolean updateSegmentSizeOnClose = true; + private final Map<SessionInfo, Integer> sessionIdMap = new HashMap<>(); private ByteBuffer buffer = ByteBuffer.allocate(bufferSize); @@ -75,6 +83,12 @@ buff.clear(); } + private void write(ByteBuffer buff, long position) throws IOException { + buff.flip(); + ch.write(buff, position); + buff.clear(); + } + private void write(ByteBuffer[] buffers) throws IOException { for (ByteBuffer buff : buffers) { buff.flip(); @@ -89,6 +103,10 @@ write(buffer); } + private void writeBuffer(long position) throws IOException { + write(buffer, position); + } + public void flush() throws IOException { ch.force(true); } @@ -98,6 +116,7 @@ try { ch = FileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); writeHeader(); + openSegmentBlock(); opened = true; } catch (IOException e) { close(); @@ -108,6 +127,8 @@ @Override public void close() throws IOException { if (ch != null) { + closeSegmentBlock(); + try { flush(); ch.close(); @@ -133,12 +154,6 @@ return sessionId; } - private void resetSegmentInfo() { - currentSegmentsBlocks = -1; - currentSegmentSize = -1; - sessionIdMap.clear(); - } - private int writeHeader() throws IOException { buffer.clear(); buffer.put(NcHeader.PREAMBULE); @@ -149,11 +164,47 @@ return NcHeader.SIZE; } - private void writeSegmentBlock(long position) throws IOException { + private void openSegmentBlock() throws IOException { + buffer.put(NcSegmentBlock.TYPE); + currentSegmentSizePos = ch.position(); + buffer.putLong(0L); + currentSegmentsBlocks = 0; currentSegmentSize = 0; } + private void closeSegmentBlock() throws IOException { + if (currentSegmentSizePos == -1) { + throw new IllegalStateException("Unable to update segment size. Segment not created."); + } + + updateSegmentInfo(-1, true); + currentSegmentSizePos = -1; + currentSegmentsBlocks = -1; + currentSegmentSize = -1; + sessionIdMap.clear(); + } + + private void updateSegmentSize(long size) throws IOException { + updateSegmentInfo(size, false); + } + + private void updateSegmentInfo(long size, boolean close) throws IOException { + if (currentSegmentSizePos == -1) { + throw new IllegalStateException("Unable to update segment size. Segment not created."); + } + + if (size >= 0) { + currentSegmentSize += size; + } + + if (updateSegmentSizeOnClose && !close) { + buffer.clear(); + buffer.putLong(currentSegmentSize); + writeBuffer(currentSegmentSizePos); + } + } + private int writeSessionInfoBlock(SessionInfo sessionInfo) { int size = sessionIdMap.size(); int sessionId = size + 1; @@ -162,6 +213,7 @@ throw new IllegalStateException("Session block '" + sessionInfo + "' already exists."); } + buffer.put(NcSessionInfoBlock.TYPE); buffer.putInt(sessionId); dataHelper.writeStringNullTerminated(sessionInfo.getSourceName(), buffer); buffer.put((byte) sessionInfo.getTransport()); @@ -176,6 +228,7 @@ checkOpened(); int sessionId = getSessionId(session, true); + buffer.put(NcSessionStatusBlock.TYPE); buffer.putLong(timestamp); buffer.putInt(sessionId); buffer.put(status); @@ -188,6 +241,7 @@ int sessionId = getSessionId(session, opened); buffer.clear(); + buffer.put(NcSessionPayloadBlock.TYPE); buffer.putLong(timestamp); buffer.putInt(sessionId); buffer.put(proto); @@ -224,15 +278,24 @@ public void writeSessionPayloadContent(Object data) throws IOException { checkOpened(); ByteBuffer dataBuffer = wrapData(data); + int dataSize = dataBuffer.remaining(); write(dataBuffer); + updateSegmentSize(dataSize); } public void writeSessionPayload(long timestamp, SessionInfo session, byte proto, Object data) throws IOException { checkOpened(); ByteBuffer dataBuffer = wrapData(data); - writeSessionPayloadHeader(timestamp, session, proto, dataBuffer.remaining()); + int dataSize = dataBuffer.remaining(); + writeSessionPayloadHeader(timestamp, session, proto, dataSize); write(dataBuffer); + updateSegmentSize(dataSize); + } + + public void closeSessionPayloadBlock() throws Exception { + checkOpened(); + } @Override
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcDataBlock.java Mon Nov 20 14:20:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcDataBlock.java Mon Nov 20 15:15:56 2017 +0100 @@ -6,6 +6,6 @@ */ public interface NcDataBlock { - public int getType(); + public byte getType(); }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSegmentBlock.java Mon Nov 20 14:20:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSegmentBlock.java Mon Nov 20 15:15:56 2017 +0100 @@ -6,13 +6,21 @@ */ public class NcSegmentBlock implements NcDataBlock { - public static final int TYPE = 1; + public static final byte TYPE = 1; private long totalSize; @Override - public int getType() { + public byte getType() { return TYPE; } + public long totalSize() { + return totalSize; + } + + public void totalSize(long totalSize) { + this.totalSize = totalSize; + } + }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionBlock.java Mon Nov 20 14:20:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionBlock.java Mon Nov 20 15:15:56 2017 +0100 @@ -4,7 +4,7 @@ * * @author Mirosław Hawrot */ -public abstract class NcSessionBlock { +public abstract class NcSessionBlock implements NcDataBlock { private long timestamp;
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionInfoBlock.java Mon Nov 20 14:20:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionInfoBlock.java Mon Nov 20 15:15:56 2017 +0100 @@ -6,7 +6,9 @@ * * @author Mirosław Hawrot */ -public class NcSessionInfoBlock { +public class NcSessionInfoBlock implements NcDataBlock { + + public static final byte TYPE = 2; private int sessionId; @@ -20,6 +22,11 @@ private int serverPort; + @Override + public byte getType() { + return TYPE; + } + public int sessionId() { return sessionId; }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionPayloadBlock.java Mon Nov 20 14:20:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionPayloadBlock.java Mon Nov 20 15:15:56 2017 +0100 @@ -6,9 +6,7 @@ */ public class NcSessionPayloadBlock extends NcSessionBlock { - public static final byte FLAG_ENCODED = 1; - - private byte flags; + public static final byte TYPE = 3; private int proto; @@ -18,12 +16,9 @@ private int optionsLength = 0; - public byte flags() { - return flags; - } - - public void flags(byte flags) { - this.flags = flags; + @Override + public byte getType() { + return TYPE; } public int proto() {
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionStatusBlock.java Mon Nov 20 14:20:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionStatusBlock.java Mon Nov 20 15:15:56 2017 +0100 @@ -6,8 +6,15 @@ */ public class NcSessionStatusBlock extends NcSessionBlock { + public static final byte TYPE = 4; + private byte status; + @Override + public byte getType() { + return TYPE; + } + public byte status() { return status; }
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java Mon Nov 20 14:20:20 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java Mon Nov 20 15:15:56 2017 +0100 @@ -45,10 +45,11 @@ long time = Long.MAX_VALUE; SessionInfo session = new SessionInfo("1.1.1.1:5000", "2.2.2.2:80"); + byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9}; try { try (NcDataBlockWriter writer = new NcDataBlockWriter(tmpFile)) { writer.open(); - writer.writeSessionPayload(time, session, (byte) 100, null); + writer.writeSessionPayload(time, session, (byte) 100, data); } byte[] content = FileUtils.readFileToByteArray(tmpFile);