Mercurial > stress-tester
changeset 679:14e07598bc8c
NC file in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java Tue Nov 21 15:43:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java Wed Nov 22 14:47:24 2017 +0100 @@ -3,9 +3,14 @@ import com.passus.data.ByteBuff; import com.passus.data.ByteString; import com.passus.data.DataHelper; +import com.passus.net.http.HttpConsts; import com.passus.net.http.HttpHeaders; import com.passus.net.http.HttpMessage; +import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpResponse; +import com.passus.net.http.HttpStatus; import static com.passus.st.reader.nc.NcHttpDataUtils.CUSTOM_HEADER_CODE; +import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_REQUEST; import java.io.IOException; /** @@ -18,12 +23,23 @@ private final DataHelper dataHelper = DataHelper.BIG_ENDIAN; - public HttpHeaders readHeaders(ByteBuff buffer) { - long headersSize = ncDataHelper.readLongVLC(buffer); - long readed = 0; + private ByteString readVersion(ByteBuff buff) throws IOException { + byte b = buff.read(); + if (b == NcHttpDataUtils.VERSION_1_0) { + return HttpConsts.VERSION_1_0; + } else if (b == NcHttpDataUtils.VERSION_1_1) { + return HttpConsts.VERSION_1_1; + } + + throw new IOException("Not supported HTTP version '" + b + "'."); + } + + public HttpHeaders readHeaders(ByteBuff buffer) throws IOException { + long headerSize = ncDataHelper.readLongVLC(buffer); + int startIndex = buffer.startIndex(); HttpHeaders headers = new HttpHeaders(); - while (readed < headersSize) { + for (;;) { byte headerCode = buffer.read(); ByteString headerName; if (headerCode == CUSTOM_HEADER_CODE) { @@ -34,11 +50,43 @@ ByteString headerValue = ncDataHelper.readByteStringNullTerminated(buffer); headers.add(headerName, headerValue); + + int readed = buffer.startIndex() - startIndex; + if (readed == headerSize) { + break; + } else if (readed > headerSize) { + throw new IOException("Readed bytes > header size."); + } } return headers; } + private HttpMessage readMessage(ByteBuff buffer) throws IOException { + int flags = dataHelper.readInt4(buffer); + + HttpMessage msg; + if ((flags & FLAG_REQUEST) != 0) { + ByteString method = ncDataHelper.readByteStringNullTerminated(buffer); + ByteString uri = ncDataHelper.readByteStringNullTerminated(buffer); + + msg = new HttpRequest(method, uri); + } else { + int statusCode = dataHelper.readInt2(buffer); + ByteString reasonPhrase = ncDataHelper.readByteStringNullTerminated(buffer); + + HttpStatus status = new HttpStatus(statusCode, reasonPhrase); + msg = new HttpResponse(status); + } + + msg.setVersion(readVersion(buffer)); + + HttpHeaders headers = readHeaders(buffer); + msg.setHeaders(headers); + + return msg; + } + private HttpMessage read(NcDataBlockReader reader) throws IOException { return null; }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java Tue Nov 21 15:43:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java Wed Nov 22 14:47:24 2017 +0100 @@ -26,8 +26,7 @@ private final DataHelper dataHelper = DataHelper.get(ByteOrder.BIG_ENDIAN); - private int encodeHeaders(HttpMessage msg, ByteBuff buff) { - HttpHeaders headers = msg.getHeaders(); + private int writeHeaders(HttpHeaders headers, ByteBuff buff) { ByteBuff headersBuff = new HeapByteBuff(); for (HttpHeaderEntry entry : headers.getEntries()) { int code = NcHttpDataUtils.headerToCode(entry.getName()); @@ -87,19 +86,19 @@ len++; } - buff.append(flags); - len += encodeHeaders(msg, buff); - + len += writeHeaders(msg.getHeaders(), buff); if (content != null) { len += content.available(); } writer.writeSessionPayloadHeader(timestamp, session, (byte) 1, len); writer.writeSessionPayloadContent(buff); - + if (content != null) { writer.writeSessionPayloadContent(content); } + + writer.closeSessionPayloadBlock(); } }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java Tue Nov 21 15:43:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java Wed Nov 22 14:47:24 2017 +0100 @@ -3,8 +3,15 @@ import com.passus.commons.Assert; import com.passus.data.ByteBuff; import com.passus.data.DataHelper; +import com.passus.net.IpAddress; +import com.passus.net.session.SessionBean; +import com.passus.st.emitter.SessionInfo; import com.passus.st.reader.DataBlockReader; import com.passus.st.reader.nc.block.NcDataBlock; +import com.passus.st.reader.nc.block.NcSegmentBlock; +import com.passus.st.reader.nc.block.NcSessionInfoBlock; +import com.passus.st.reader.nc.block.NcSessionPayloadBlock; +import com.passus.st.reader.nc.block.NcSessionStatusBlock; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -13,6 +20,8 @@ import java.nio.file.Paths; import java.nio.file.Files; import java.nio.file.StandardOpenOption; +import java.util.HashMap; +import java.util.Map; /** * @@ -27,7 +36,9 @@ private NcHeader header; - private DataHelper helper = DataHelper.BIG_ENDIAN; + private DataHelper dataHelper = DataHelper.BIG_ENDIAN; + + private NcDataHelper ncDataHelper = NcDataHelper.getInstance(); private final Path path; @@ -37,11 +48,17 @@ private ByteBuffer nioBuffer; + private ByteBuffer[] nioBuffers; + private ByteBuff buffer; - private ByteBuffer[] buffers; + private boolean opened = false; - private boolean opened = false; + private boolean eof = false; + + private final Map<Integer, SessionInfo> sessionIdMap = new HashMap<>(); + + private NcSegmentBlock currentSegmentBlock; public NcDataBlockReader(String fileName) { this.path = Paths.get(fileName); @@ -71,8 +88,8 @@ try { ch = FileChannel.open(path, StandardOpenOption.READ); nioBuffer = ByteBuffer.allocate(bufferSize); - buffers = new ByteBuffer[1]; - buffers[0] = nioBuffer; + nioBuffers = new ByteBuffer[1]; + nioBuffers[0] = nioBuffer; opened = true; readHeader(); @@ -94,7 +111,7 @@ ch = null; nioBuffer = null; - buffers = null; + nioBuffers = null; opened = false; } @@ -108,15 +125,28 @@ ch.position(ch.position() + length); } - private void fill(int requiredBytes) throws IOException { + private void read(int requiredBytes) throws IOException { + if (eof) { + return; + } + + if (buffer.readableBytes() <= requiredBytes) { + return; + } + + if (buffer.isEmpty() && buffer.readedBytes() > 0) { + buffer.clear(); + } + while (buffer.readableBytes() >= requiredBytes) { int res = ch.read(nioBuffer); if (res == -1) { + eof = true; break; } nioBuffer.flip(); - - buffer.append(buffer); + buffer.append(nioBuffer); + buffer.clear(); } } @@ -126,45 +156,118 @@ } - //public - private void readBytes(int offset, int length) throws IOException { - nioBuffer.clear(); - long readed = ch.read(buffers, offset, length); - } + private void readHeader() throws IOException { + read(NcHeader.SIZE); - private void readSessionInfoBlock() { + if (!NcHeader.isPreambule(buffer.buffer(), nioBuffer.arrayOffset())) { + throw new IOException("Invalid preambule."); + } + buffer.skipBytes(NcHeader.PREAMBULE.length); - } + byte verMajor = buffer.read(); + byte verMinor = buffer.read(); + if (verMajor != VERSION_MAJOR || verMinor != VERSION_MINOR) { + throw new IOException("Not supported nc file version."); + } - private void readBlock() { + buffer.skipBytes(8); //reserved bytes + header = new NcHeader(verMajor, verMinor); + nioBuffer.position(NcHeader.PREAMBULE.length - 1); } - private void readHeader() throws IOException { - readBytes(0, bufferSize); - - byte[] data = nioBuffer.array(); - int offset = nioBuffer.arrayOffset(); + private NcSegmentBlock readSegmentBlock(boolean skipBlockType) throws IOException { + read(NcSegmentBlock.SIZE); - if (!NcHeader.isPreambule(data, offset)) { - throw new IOException("Invalid preambule."); - } - offset += NcHeader.PREAMBULE.length; - - byte verMajor = data[offset++]; - byte varMinor = data[offset++]; - if (varMinor != VERSION_MAJOR || varMinor != VERSION_MINOR) { - throw new IOException("Not supported version."); + if (!skipBlockType) { + byte blockType = buffer.read(); + if (blockType != NcSegmentBlock.TYPE) { + throw new IOException("Invalid block type (required " + NcSegmentBlock.TYPE + " given " + blockType + ")"); + } } - header = new NcHeader(verMajor, varMinor); - nioBuffer.position(NcHeader.PREAMBULE.length - 1); + long totalSize = dataHelper.readLong8(buffer); + long prevTotalSize = dataHelper.readLong8(buffer); + int blocksNum = dataHelper.readInt4(buffer); + return new NcSegmentBlock(totalSize, prevTotalSize, blocksNum); + } + + private NcSessionInfoBlock readSessionInfoBlock(boolean skipBlockType) throws IOException { + read(NcSessionInfoBlock.MAX_SIZE); + + if (!skipBlockType) { + byte blockType = buffer.read(); + if (blockType != NcSegmentBlock.TYPE) { + throw new IOException("Invalid block type (required " + NcSessionInfoBlock.TYPE + " given " + blockType + ")."); + } + } + + int totalSize = dataHelper.readInt4(buffer); + int sessionId = dataHelper.readInt4(buffer); + String sourceName = ncDataHelper.readStringNullTerminated(buffer); + byte transport = buffer.read(); + if (transport != SessionBean.PROTOCOL_TCP && transport != SessionBean.PROTOCOL_UDP) { + throw new IOException("Not supported session transport '" + transport + "'."); + } + + IpAddress clientIp = ncDataHelper.readIpAddress(buffer); + int clientPort = dataHelper.readInt2(buffer); + IpAddress serverIp = ncDataHelper.readIpAddress(buffer); + int serverPort = dataHelper.readInt2(buffer); + + SessionInfo sessionInfo = new SessionInfo(serverIp, serverPort, clientIp, clientPort, serverPort); + sessionInfo.setSourceName(sourceName); + if (sessionIdMap.put(sessionId, sessionInfo) != null) { + throw new IOException("Multiple session info block for sessionId '" + sessionId + "'."); + } + + return new NcSessionInfoBlock(totalSize, sessionId, sessionInfo); + } + + private NcSessionStatusBlock readSessionStatusBlock(boolean skipBlockType) throws IOException { + read(NcSessionStatusBlock.SIZE); + + if (!skipBlockType) { + byte blockType = buffer.read(); + if (blockType != NcSegmentBlock.TYPE) { + throw new IOException("Invalid block type (required " + NcSegmentBlock.TYPE + " given " + blockType + ")."); + } + } + + byte status = buffer.read(); + return new NcSessionStatusBlock(status); + } + + private NcSessionPayloadBlock readSessionPayloadBlock(boolean skipBlockType) throws IOException { + //read(NcSessionPayloadBlock.SIZE); + return null; + } + + public int peekNextBlockType() throws IOException { + read(1); + return buffer.get(); } @Override public NcDataBlock read() throws IOException { checkOpened(); + read(1); + byte blockType = buffer.read(); + switch (blockType) { + case NcSegmentBlock.TYPE: + readSegmentBlock(true); + break; + case NcSessionInfoBlock.TYPE: + readSessionInfoBlock(true); + break; + case NcSessionStatusBlock.TYPE: + readSessionStatusBlock(true); + break; + case NcSessionPayloadBlock.TYPE: + break; + } + return null; }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Tue Nov 21 15:43:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Wed Nov 22 14:47:24 2017 +0100 @@ -53,6 +53,8 @@ private int maxSegmentBlocks = DEFAULT_MAX_SEGMENT_BLOCKS; + private long prevSegmentSize = 0; + private long currentSegmentInfoPos = -1; private int currentSegmentsBlocks = -1; @@ -188,6 +190,7 @@ currentSegmentInfoPos = ch.position() + 1; buffer.put(NcSegmentBlock.TYPE); buffer.putLong(0L); + buffer.putLong(0L); buffer.putShort((short) 0); writeBuffer(); @@ -201,6 +204,7 @@ } updateSegmentInfo(-1, true); + prevSegmentSize = currentSegmentSize; currentSegmentInfoPos = -1; currentSegmentsBlocks = -1; currentSegmentSize = -1; @@ -223,6 +227,7 @@ if (close || !updateSegmentSizeOnClose) { buffer.clear(); buffer.putLong(currentSegmentSize); + buffer.putLong(prevSegmentSize); buffer.putShort((short) currentSegmentsBlocks); writeBuffer(currentSegmentInfoPos); } @@ -277,6 +282,9 @@ openBlock(NcSessionInfoBlock.TYPE); int size = 10; buffer.put(NcSessionInfoBlock.TYPE); + + int sizePosition = buffer.position(); + buffer.putInt(0); buffer.putInt(sessionId); size += dataHelper.writeStringNullTerminated(sessionInfo.getSourceName(), buffer); buffer.put((byte) sessionInfo.getTransport()); @@ -285,10 +293,10 @@ size += dataHelper.writeIpAddress(sessionInfo.getDstIp(), buffer); buffer.putShort((short) sessionInfo.getDstPort()); closeBlock(NcSessionInfoBlock.TYPE, false); + buffer.putInt(sizePosition, size); incrementSegmentBlocks(false); updateSegmentInfo(size); - return sessionId; } @@ -306,7 +314,7 @@ writeBuffer(); closeBlock(NcSessionStatusBlock.TYPE); - updateSegmentInfo(NcSessionStatusBlock.HEADER_SIZE); + updateSegmentInfo(NcSessionStatusBlock.SIZE); } public void writeSessionPayloadHeader(long timestamp, SessionInfo session, byte proto, long dataLength) throws IOException { @@ -367,7 +375,7 @@ updateSegmentInfo(dataSize); } - public void closeSessionPayloadBlock() throws Exception { + public void closeSessionPayloadBlock() throws IOException { checkOpened(); if (currentBlockType != NcSessionPayloadBlock.TYPE) { @@ -393,6 +401,8 @@ @Override public void write(NcDataBlock block) throws IOException { checkOpened(); + + throw new RuntimeException("Not supported yet."); } }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java Tue Nov 21 15:43:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java Wed Nov 22 14:47:24 2017 +0100 @@ -1,12 +1,14 @@ package com.passus.st.reader.nc; import com.passus.commons.AsciiUtils; -import com.passus.commons.utils.FormatUtils; import com.passus.data.ByteBuff; import com.passus.data.ByteBuffUtils; import com.passus.data.ByteString; import com.passus.data.DataUtils; +import com.passus.net.Ip4Address; +import com.passus.net.Ip6Address; import com.passus.net.IpAddress; +import com.passus.net.utils.AddressUtils; import java.nio.ByteBuffer; import java.nio.charset.Charset; import org.apache.commons.lang3.mutable.MutableInt; @@ -256,6 +258,19 @@ return len + 1; } + public String readStringNullTerminated(ByteBuff buffer) { + int index = ByteBuffUtils.localize(buffer, AsciiUtils.NUL); + if (index != -1) { + int startIndex = buffer.startIndex(); + int length = index - startIndex; + String value = buffer.toString(startIndex, length, CHARSET); + buffer.skipBytes(length); + return value; + } + + throw new IllegalArgumentException("Unable to find NULL delimiter."); + } + public int writeStringNullTerminated(String value, ByteBuffer buffer) { if (value != null && !value.isEmpty()) { byte[] data = value.getBytes(CHARSET); @@ -275,6 +290,21 @@ return data.length + 1; } + public IpAddress readIpAddress(ByteBuff buffer) { + byte ipVer = buffer.read(); + if (ipVer == IpAddress.IP4) { + byte[] data = new byte[4]; + buffer.read(data, 0, 4); + return new Ip4Address(data); + } else if (ipVer == IpAddress.IP6) { + byte[] data = new byte[16]; + buffer.read(data, 0, 16); + return new Ip6Address(data); + } + + throw new IllegalArgumentException("Invalid ip version '" + ipVer + "'."); + } + public static NcDataHelper getInstance() { return THREAD_LOCAL.get(); }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcDataBlock.java Tue Nov 21 15:43:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcDataBlock.java Wed Nov 22 14:47:24 2017 +0100 @@ -6,6 +6,6 @@ */ public interface NcDataBlock { - public byte getType(); + public byte type(); }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSegmentBlock.java Tue Nov 21 15:43:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSegmentBlock.java Wed Nov 22 14:47:24 2017 +0100 @@ -8,12 +8,24 @@ public static final byte TYPE = 1; + public static final int SIZE = 10; + private long totalSize; + private long prevTotalSize; + private int blocksNum; + public NcSegmentBlock() { + } + + public NcSegmentBlock(long totalSize, long prevTotalSize, int blocksNum) { + this.totalSize = totalSize; + this.blocksNum = blocksNum; + } + @Override - public byte getType() { + public byte type() { return TYPE; } @@ -25,6 +37,14 @@ this.totalSize = totalSize; } + public long prevTotalSize() { + return prevTotalSize; + } + + public void prevTotalSize(long prevTotalSize) { + this.prevTotalSize = prevTotalSize; + } + public int blocksNum() { return blocksNum; }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionInfoBlock.java Tue Nov 21 15:43:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionInfoBlock.java Wed Nov 22 14:47:24 2017 +0100 @@ -1,6 +1,7 @@ package com.passus.st.reader.nc.block; import com.passus.net.IpAddress; +import com.passus.st.emitter.SessionInfo; /** * @@ -10,23 +11,36 @@ public static final byte TYPE = 2; + public static final int MAX_SIZE = 338; + + private int totalSize; + private int sessionId; - private byte transport; - - private IpAddress clientAddress; + private SessionInfo sessionInfo; - private int clientPort; + public NcSessionInfoBlock() { + } - private IpAddress serverAddress; - - private int serverPort; + public NcSessionInfoBlock(int totalSize, int sessionId, SessionInfo sessionInfo) { + this.totalSize = totalSize; + this.sessionId = sessionId; + this.sessionInfo = sessionInfo; + } @Override - public byte getType() { + public byte type() { return TYPE; } + public int totalSize() { + return totalSize; + } + + public void totalSize(int totalSize) { + this.totalSize = totalSize; + } + public int sessionId() { return sessionId; } @@ -35,44 +49,12 @@ this.sessionId = sessionId; } - public byte transport() { - return transport; - } - - public void transport(byte transport) { - this.transport = transport; - } - - public IpAddress clientAddress() { - return clientAddress; - } - - public void clientAddress(IpAddress clientAddress) { - this.clientAddress = clientAddress; + public SessionInfo sessionInfo() { + return sessionInfo; } - public int clientPort() { - return clientPort; - } - - public void clientPort(int clientPort) { - this.clientPort = clientPort; - } - - public IpAddress serverAddress() { - return serverAddress; - } - - public void serverAddress(IpAddress serverAddress) { - this.serverAddress = serverAddress; - } - - public int serverPort() { - return serverPort; - } - - public void serverPort(int serverPort) { - this.serverPort = serverPort; + public void sessionInfo(SessionInfo sessionInfo) { + this.sessionInfo = sessionInfo; } }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionPayloadBlock.java Tue Nov 21 15:43:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionPayloadBlock.java Wed Nov 22 14:47:24 2017 +0100 @@ -1,5 +1,7 @@ package com.passus.st.reader.nc.block; +import java.util.Map; + /** * * @author Mirosław Hawrot @@ -8,6 +10,8 @@ public static final byte TYPE = 3; + private long totalSize; + private int proto; private long dataLength; @@ -16,11 +20,31 @@ private int optionsLength = 0; + private Map<String, Object> options; + + public NcSessionPayloadBlock() { + } + + public NcSessionPayloadBlock(long totalSize, int proto, long dataLength, Object data) { + this.totalSize = totalSize; + this.proto = proto; + this.dataLength = dataLength; + this.data = data; + } + @Override - public byte getType() { + public byte type() { return TYPE; } + public long totalSize() { + return totalSize; + } + + public void totalSize(long totalSize) { + this.totalSize = totalSize; + } + public int proto() { return proto; }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionStatusBlock.java Tue Nov 21 15:43:20 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/block/NcSessionStatusBlock.java Wed Nov 22 14:47:24 2017 +0100 @@ -8,12 +8,19 @@ public static final byte TYPE = 4; - public static final int HEADER_SIZE = 14; - + public static final int SIZE = 2; + private byte status; + public NcSessionStatusBlock() { + } + + public NcSessionStatusBlock(byte status) { + this.status = status; + } + @Override - public byte getType() { + public byte type() { return TYPE; }
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java Tue Nov 21 15:43:20 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java Wed Nov 22 14:47:24 2017 +0100 @@ -56,8 +56,11 @@ assertEquals(NcSegmentBlock.TYPE, content[offset++]); long segmentSize = dataHelper.getLong8(content, offset); offset += 8; + long prevSegmentSize = dataHelper.getLong8(content, offset); + offset += 8; int segmentBlocks = dataHelper.getInt2(content, offset); offset += 2; + assertEquals(0, prevSegmentSize); assertEquals(2, segmentBlocks); //SessionInfoBlock