Mercurial > stress-tester
changeset 682:e420a96bed43
NC file in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Wed Nov 22 15:40:56 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Thu Nov 23 13:21:01 2017 +0100 @@ -32,7 +32,7 @@ private final int hashCode; - private String sourceName; + private String sourceName = ""; public SessionInfo(String srcSocket, String dstSocket) throws ParseException { this(new SocketAddress(srcSocket), new SocketAddress(dstSocket));
--- a/stress-tester/src/main/java/com/passus/st/reader/DataReader.java Wed Nov 22 15:40:56 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/DataReader.java Thu Nov 23 13:21:01 2017 +0100 @@ -1,45 +1,13 @@ package com.passus.st.reader; -import com.passus.net.IpAddress; +import java.io.IOException; /** * * @author Mirosław Hawrot */ -public interface DataReader { - - public long size(); - - public long remains(); - - public int position(); - - public void position(long pos); - - public void skip(int length); - - public int read(); - - public long readLength(); +public interface DataReader<T, K> { - public byte readByte(); - - public byte[] readBytes(int length); - - public int readInt2(); - - public int readInt3(); - - public int readInt4(); - - public long readLong4(); - - public long readLong8(); - - public String readString(); - - public String readStringNullTerminated(); - - public IpAddress readIpAddress(); + public T read(DataBlockReader<K> reader) throws IOException; }
--- a/stress-tester/src/main/java/com/passus/st/reader/DataWriter.java Wed Nov 22 15:40:56 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/DataWriter.java Thu Nov 23 13:21:01 2017 +0100 @@ -1,33 +1,13 @@ package com.passus.st.reader; -import com.passus.net.IpAddress; +import java.io.IOException; /** * * @author Mirosław Hawrot */ -public interface DataWriter { - - public void writeLength(long length); - - public void writeByte(byte value); - - public void writeBytes(byte[] value); - - public void writeInt2(int value); +public interface DataWriter<T, K> { - public void writeInt3(int value); - - public void writeInt4(int value); - - public void writeLong4(long value); - - public void writeLong8(long value); - - public void writeString(String value); - - public void writeStringNullTerminated(String value); - - public void writeIpAddress(IpAddress ipAddress); + public void write(T object, DataBlockWriter<K> writer) throws IOException; }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java Wed Nov 22 15:40:56 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java Thu Nov 23 13:21:01 2017 +0100 @@ -3,6 +3,7 @@ import com.passus.commons.Assert; import com.passus.data.ByteBuff; import com.passus.data.DataHelper; +import com.passus.data.HeapByteBuff; import com.passus.net.IpAddress; import com.passus.net.session.SessionBean; import com.passus.st.emitter.SessionInfo; @@ -87,6 +88,8 @@ try { ch = FileChannel.open(path, StandardOpenOption.READ); + System.out.println("size: " + ch.size()); + buffer = new HeapByteBuff(bufferSize); nioBuffer = ByteBuffer.allocate(bufferSize); nioBuffers = new ByteBuffer[1]; nioBuffers[0] = nioBuffer; @@ -125,41 +128,52 @@ ch.position(ch.position() + length); } - private void read(int requiredBytes) throws IOException { + private int read(int requiredBytes) throws IOException { if (eof) { - return; + return -1; } - if (buffer.readableBytes() <= requiredBytes) { - return; + if (buffer.readableBytes() >= requiredBytes) { + return -1; } if (buffer.isEmpty() && buffer.readedBytes() > 0) { buffer.clear(); } - while (buffer.readableBytes() >= requiredBytes) { + int readed = 0; + while (buffer.readableBytes() < requiredBytes) { int res = ch.read(nioBuffer); if (res == -1) { eof = true; - break; + return -1; } + nioBuffer.flip(); + readed += nioBuffer.remaining(); buffer.append(nioBuffer); - buffer.clear(); + nioBuffer.clear(); } + + return readed; } @Override public void reset() throws IOException { header = null; + ch.position(NcHeader.SIZE); + buffer.clear(); + nioBuffer.clear(); + } + public long position() throws IOException { + return ch.position() - buffer.readedBytes(); } private void readHeader() throws IOException { read(NcHeader.SIZE); - if (!NcHeader.isPreambule(buffer.buffer(), nioBuffer.arrayOffset())) { + if (!NcHeader.isPreambule(buffer.buffer(), buffer.startIndex())) { throw new IOException("Invalid preambule."); } buffer.skipBytes(NcHeader.PREAMBULE.length); @@ -192,7 +206,7 @@ long totalSize = dataHelper.readLong8(buffer); long prevTotalSize = dataHelper.readLong8(buffer); - int blocksNum = dataHelper.readInt4(buffer); + int blocksNum = dataHelper.readInt2(buffer); return new NcSegmentBlock(totalSize, prevTotalSize, blocksNum); } @@ -217,7 +231,7 @@ IpAddress serverIp = ncDataHelper.readIpAddress(buffer); int serverPort = dataHelper.readInt2(buffer); - SessionInfo sessionInfo = new SessionInfo(serverIp, serverPort, clientIp, clientPort, serverPort); + SessionInfo sessionInfo = new SessionInfo(clientIp, clientPort, serverIp, serverPort, transport); sessionInfo.setSourceName(sourceName); if (sessionIdMap.put(sessionId, sessionInfo) != null) { throw new IOException("Multiple session info block for sessionId '" + sessionId + "'."); @@ -245,7 +259,7 @@ return new NcSessionStatusBlock(timestamp, sessionId, session, status); } - private NcSessionPayloadBlock readSessionPayloadBlock(boolean skipBlockType) throws IOException { + private NcSessionPayloadBlock readSessionPayloadBlockHeader(boolean skipBlockType) throws IOException { read(NcSessionPayloadBlock.MAX_HEADER_SIZE); if (!skipBlockType) { @@ -259,47 +273,92 @@ if (session == null) { throw new IOException("Invalid session id '" + sessionId + "'."); } - long totalSize = dataHelper.readLong8(buffer); + //long totalSize = dataHelper.readLong8(buffer); int proto = dataHelper.readInt2(buffer); long dataLength = ncDataHelper.readLongVLC(buffer); long optionsLength = ncDataHelper.readLongVLC(buffer); - //TODO Odczyt danych - Map<String, Object> options = null; - if (optionsLength != 0) { - throw new RuntimeException("optionsLength != 0 - not implemented"); + return new NcSessionPayloadBlock(timestamp, sessionId, session, -1, proto, dataLength, optionsLength, null, null); + } + + private int readSessionPayloadBlockContent(ByteBuff data, int length) throws IOException { + int readed = buffer.read(data, length); + while (readed < length) { + int res = read(bufferSize); + if (res == -1) { + throw new IOException("Unable to read full content. EOF reached."); + } + + readed += buffer.read(data, length - readed); } - return new NcSessionPayloadBlock(timestamp, sessionId, session, totalSize, proto, dataLength, optionsLength, null, options); + return readed; } - public int peekNextBlockType() throws IOException { + private NcSessionPayloadBlock readSessionPayloadBlock(boolean skipBlockType, boolean skipData, boolean skipOptions) throws IOException { + NcSessionPayloadBlock payloadBlock = readSessionPayloadBlockHeader(skipBlockType); + + if (!skipData) { + if (payloadBlock.dataLength() <= Integer.MAX_VALUE) { + int dataLength = (int) payloadBlock.dataLength(); + ByteBuff data = new HeapByteBuff(dataLength); + readSessionPayloadBlockContent(data, dataLength); + payloadBlock.data(data); + } else { + long startPosition = position(); + long endPosition = startPosition + payloadBlock.dataLength(); + ReadonlyFileChannelDataSource ds = new ReadonlyFileChannelDataSource(ch, path, startPosition, endPosition); + payloadBlock.data(ds); + } + } + + Map<String, Object> options = null; + if (!skipOptions && payloadBlock.optionsLength() != 0) { + throw new RuntimeException("optionsLength != 0 - not implemented yet"); + } + + return payloadBlock; + } + + public int peekBlockType() throws IOException { read(1); return buffer.get(); } + public NcSessionStatusBlock readSessionStatusBlock() throws IOException { + return readSessionStatusBlock(false); + } + + public NcSessionPayloadBlock readSessionPayloadBlockHeader() throws IOException { + return readSessionPayloadBlockHeader(false); + } + + public int readSessionPayloadContent(ByteBuffer data, long length) { + + return 0; + } + @Override public NcDataBlock read() throws IOException { checkOpened(); + if (eof && buffer.isEmpty()) { + return null; + } read(1); byte blockType = buffer.read(); switch (blockType) { case NcSegmentBlock.TYPE: - readSegmentBlock(true); - break; + return readSegmentBlock(true); case NcSessionInfoBlock.TYPE: - readSessionInfoBlock(true); - break; + return readSessionInfoBlock(true); case NcSessionStatusBlock.TYPE: - readSessionStatusBlock(true); - break; + return readSessionStatusBlock(true); case NcSessionPayloadBlock.TYPE: - readSessionPayloadBlock(true); - break; + return readSessionPayloadBlock(true, false, false); + default: + throw new IOException("Unsupported block type '" + blockType + "'."); } - - return null; } }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Wed Nov 22 15:40:56 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Thu Nov 23 13:21:01 2017 +0100 @@ -294,7 +294,6 @@ buffer.putShort((short) sessionInfo.getDstPort()); closeBlock(NcSessionInfoBlock.TYPE, false); buffer.putInt(sizePosition, size); - incrementSegmentBlocks(false); updateSegmentInfo(size); @@ -317,7 +316,7 @@ updateSegmentInfo(NcSessionStatusBlock.HEADER_SIZE); } - public void writeSessionPayloadHeader(long timestamp, SessionInfo session, byte proto, long dataLength) throws IOException { + public void writeSessionPayloadHeader(long timestamp, SessionInfo session, int proto, long dataLength) throws IOException { checkOpened(); int sessionId = getSessionId(session, opened); @@ -325,11 +324,15 @@ buffer.put(NcSessionPayloadBlock.TYPE); buffer.putLong(timestamp); buffer.putInt(sessionId); - buffer.put(proto); - int size = 14; + //int totalSizePosition = buffer.position(); + //buffer.putLong(0L); + buffer.putShort((short) proto); + + int size = 18; size += ncDataHelper.writeLongVLC(dataLength, buffer); size += ncDataHelper.writeLongVLC(0, buffer); // OptionsLength + //buffer.putLong(totalSizePosition, size); updateSegmentInfo(size); writeBuffer(); @@ -388,13 +391,14 @@ closeBlock(NcSessionPayloadBlock.TYPE); } - public void writeSessionPayload(long timestamp, SessionInfo session, byte proto, Object data) throws IOException { + public void writeSessionPayload(long timestamp, SessionInfo session, int proto, Object data) throws IOException { checkOpened(); ByteBuffer dataBuffer = wrapData(data); int dataSize = dataBuffer.remaining(); writeSessionPayloadHeader(timestamp, session, proto, dataSize); - write(dataBuffer); + ch.write(dataBuffer); + updateSegmentInfo(dataSize); closeBlock(NcSessionPayloadBlock.TYPE); } @@ -403,7 +407,21 @@ public void write(NcDataBlock block) throws IOException { checkOpened(); - throw new RuntimeException("Not supported yet."); + switch (block.type()) { + case NcSegmentBlock.TYPE: + case NcSessionInfoBlock.TYPE: + throw new IllegalArgumentException("Only session status or session payload block should be written."); + case NcSessionStatusBlock.TYPE: + NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) block; + writeSessionStatus(statusBlock.timestamp(), statusBlock.sessionInfo(), statusBlock.status()); + break; + case NcSessionPayloadBlock.TYPE: + NcSessionPayloadBlock sessionPayload = (NcSessionPayloadBlock) block; + writeSessionPayload(sessionPayload.timestamp(), sessionPayload.sessionInfo(), sessionPayload.proto(), sessionPayload.data()); + break; + default: + throw new IllegalArgumentException("Not supported block '" + block.type() + "'."); + } } }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java Wed Nov 22 15:40:56 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java Thu Nov 23 13:21:01 2017 +0100 @@ -216,11 +216,8 @@ } public int writeByteStringNullTerminated(ByteString value, ByteBuff buffer, boolean sanitize) { - if (buffer.writableBytes() < value.length()) { - buffer.ensureCapacity(value.length()); - } - - int len = writeByteStringNullTerminated(value, buffer.buffer(), buffer.startIndex(), buffer.capacity(), sanitize); + buffer.ensureCapacity(value.length()); + int len = writeByteStringNullTerminated(value, buffer.buffer(), buffer.endIndex(), buffer.capacity(), sanitize); buffer.endIndex(buffer.endIndex() + len); return len; } @@ -264,7 +261,7 @@ int startIndex = buffer.startIndex(); int length = index - startIndex; String value = buffer.toString(startIndex, length, CHARSET); - buffer.skipBytes(length); + buffer.skipBytes(length + 1); return value; }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/ReadonlyFileChannelDataSource.java Wed Nov 22 15:40:56 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/ReadonlyFileChannelDataSource.java Thu Nov 23 13:21:01 2017 +0100 @@ -15,7 +15,9 @@ */ public class ReadonlyFileChannelDataSource implements DataSource { - private final FileChannel ch; + private FileChannel ch; + + private final Path path; private long startPosition = -1; @@ -25,6 +27,8 @@ private byte[] data; + private boolean opened = false; + public ReadonlyFileChannelDataSource(File file) throws IOException { this(file.toPath()); } @@ -34,18 +38,18 @@ } public ReadonlyFileChannelDataSource(Path path) throws IOException { - this(FileChannel.open(path, StandardOpenOption.READ)); + this(FileChannel.open(path, StandardOpenOption.READ), path); } public ReadonlyFileChannelDataSource(Path path, long startPosition, long endPosition) throws IOException { - this(FileChannel.open(path, StandardOpenOption.READ), startPosition, endPosition); + this(FileChannel.open(path, StandardOpenOption.READ), path, startPosition, endPosition); } - public ReadonlyFileChannelDataSource(FileChannel ch) throws IOException { - this(ch, 0, ch.size()); + public ReadonlyFileChannelDataSource(FileChannel ch, Path path) throws IOException { + this(ch, path, 0, ch.size()); } - public ReadonlyFileChannelDataSource(FileChannel ch, long startPosition, long endPosition) { + public ReadonlyFileChannelDataSource(FileChannel ch, Path path, long startPosition, long endPosition) { Assert.greaterOrEqualZero(startPosition, "startPosition"); Assert.greaterOrEqualZero(endPosition, "endPosition"); Assert.notNull(ch, "channel"); @@ -54,6 +58,7 @@ } this.ch = ch; + this.path = path; this.startPosition = startPosition; this.endPosition = endPosition; } @@ -69,12 +74,24 @@ } @Override - public void open() { - + public void open() throws IOException { + if (!ch.isOpen()) { + ch = FileChannel.open(path, StandardOpenOption.READ); + opened = true; + } } @Override public void close() throws IOException { + if (opened) { + try { + ch.close(); + } catch (Exception ignore) { + } + + opened = false; + } + dataBb = null; data = null; } @@ -115,13 +132,13 @@ } ByteBuffer dataBb = (this.data == data ? this.dataBb : ByteBuffer.wrap(data)); - + int available = available(); int limit = Math.min(offset + length, dataBb.capacity()); - if( limit > available ) { + if (limit > available) { limit = available; } - + dataBb.limit(limit); dataBb.position(offset);
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionBlock.java Wed Nov 22 15:40:56 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionBlock.java Thu Nov 23 13:21:01 2017 +0100 @@ -41,4 +41,12 @@ this.sessionId = sessionId; } + public SessionInfo sessionInfo() { + return sessionInfo; + } + + public void sessionInfo(SessionInfo sessionInfo) { + this.sessionInfo = sessionInfo; + } + }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java Thu Nov 23 13:21:01 2017 +0100 @@ -0,0 +1,96 @@ +package com.passus.st.reader.nc; + +import com.passus.data.ByteBuff; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.reader.nc.block.NcDataBlock; +import com.passus.st.reader.nc.block.NcSegmentBlock; +import com.passus.st.reader.nc.block.NcSessionInfoBlock; +import com.passus.st.reader.nc.block.NcSessionPayloadBlock; +import java.io.File; +import java.io.IOException; +import java.text.ParseException; +import java.util.UUID; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertSame; +import static org.testng.AssertJUnit.assertTrue; +import org.testng.annotations.Test; + +/** + * + * @author Mirosław Hawrot + */ +public class NcDataBlockReaderTest { + + private final SessionInfo session; + + private final byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9}; + + public NcDataBlockReaderTest() throws ParseException { + session = new SessionInfo("1.1.1.1:5000", "2.2.2.2:80"); + } + + private File createTmpFile() throws IOException { + File tmpDir = new File(System.getProperty("java.io.tmpdir"), "st"); + if (!tmpDir.exists()) { + tmpDir.mkdir(); + } + + return new File(tmpDir, "st_" + UUID.randomUUID().toString() + ".tmp"); + } + + private File writeData() throws Exception { + File tmpFile = createTmpFile(); + + long time = Long.MAX_VALUE; + try (NcDataBlockWriter writer = new NcDataBlockWriter(tmpFile)) { + writer.open(); + writer.writeSessionPayload(time, session, (byte) 100, data); + } + + return tmpFile; + } + + @Test + public void testRead() throws Exception { + File file = null; + + try { + file = writeData(); + try (NcDataBlockReader reader = new NcDataBlockReader(file)) { + reader.open(); + + NcDataBlock block = reader.read(); + assertEquals(NcSegmentBlock.TYPE, block.type()); + NcSegmentBlock firstSegmentBlock = (NcSegmentBlock) block; + assertEquals(2, firstSegmentBlock.blocksNum()); + + block = reader.read(); + assertEquals(NcSessionInfoBlock.TYPE, block.type()); + NcSessionInfoBlock sessionInfoBlock = (NcSessionInfoBlock) block; + assertEquals(session, sessionInfoBlock.sessionInfo()); + + block = reader.read(); + assertEquals(NcSessionPayloadBlock.TYPE, block.type()); + NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) block; + assertSame(sessionInfoBlock.sessionInfo(), payloadBlock.sessionInfo()); + assertEquals(data.length, payloadBlock.dataLength()); + assertEquals(0, payloadBlock.optionsLength()); + assertTrue(payloadBlock.data() instanceof ByteBuff); + ByteBuff payload = (ByteBuff) payloadBlock.data(); + assertEquals(data, payload.toArray()); + + /*block = reader.read(); + assertEquals(NcSegmentBlock.TYPE, block.type()); + NcSegmentBlock lastSegmentBlock = (NcSegmentBlock) block; + assertEquals(0, lastSegmentBlock.blocksNum()); + assertEquals(0, lastSegmentBlock.totalSize()); + assertEquals(firstSegmentBlock.totalSize(), lastSegmentBlock.prevTotalSize());*/ + } + } finally { + if (file != null) { + file.delete(); + } + } + } + +}
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataHelperTest.java Wed Nov 22 15:40:56 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataHelperTest.java Thu Nov 23 13:21:01 2017 +0100 @@ -1,5 +1,6 @@ package com.passus.st.reader.nc; +import com.passus.commons.AsciiUtils; import com.passus.data.ByteBuff; import com.passus.data.HeapByteBuff; import java.nio.ByteBuffer; @@ -14,7 +15,7 @@ * @author Mirosław Hawrot */ public class NcDataHelperTest { - + private static final byte B_FF = (byte) 0xff; private static final byte B_80 = (byte) 0x80; @@ -104,4 +105,43 @@ helper.writeLongVLC(length, result); assertEquals(expectedResult, result); } + + @Test + public void testWriteStringNullTerminated() { + ByteBuffer buffer = ByteBuffer.allocate(256); + + helper.writeStringNullTerminated("", buffer); + buffer.flip(); + assertEquals(1, buffer.remaining()); + assertEquals(AsciiUtils.NUL, buffer.get()); + buffer.clear(); + + helper.writeStringNullTerminated(null, buffer); + buffer.flip(); + assertEquals(1, buffer.remaining()); + assertEquals(AsciiUtils.NUL, buffer.get()); + buffer.clear(); + + helper.writeStringNullTerminated("123", buffer); + buffer.flip(); + assertEquals(4, buffer.remaining()); + assertEquals(0x31, buffer.get()); + assertEquals(0x32, buffer.get()); + assertEquals(0x33, buffer.get()); + assertEquals(AsciiUtils.NUL, buffer.get()); + + } + + @Test + public void testReadStringNullTerminated() { + ByteBuff buffer = new HeapByteBuff(new byte[]{0x31, 0x32, 0x33, AsciiUtils.NUL}); + String res = helper.readStringNullTerminated(buffer); + assertEquals("123", res); + assertEquals(0, buffer.readableBytes()); + + buffer = new HeapByteBuff(new byte[]{AsciiUtils.NUL}); + res = helper.readStringNullTerminated(buffer); + assertEquals("", res); + assertEquals(0, buffer.readableBytes()); + } }