Mercurial > stress-tester
changeset 700:7955b2096163
NC file in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java Tue Nov 28 11:46:55 2017 +0100 @@ -3,7 +3,7 @@ import com.passus.net.http.HttpMessageHelper; import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpResponse; -import com.passus.st.client.http.HttpSessionPayloadEvent.HttpReqResp; +import com.passus.st.client.http.HttpReqResp; import com.passus.st.client.SessionPayloadEvent; import com.passus.st.emitter.SessionInfo; @@ -43,26 +43,6 @@ return new HttpSessionPayloadEvent(getSessionInfo(), reqCopy, payload.response, getSourceName()); } - public static class HttpReqResp { - - private final HttpRequest request; - - private final HttpResponse response; - - public HttpReqResp(HttpRequest request, HttpResponse response) { - this.request = request; - this.response = response; - } - - public HttpRequest getRequest() { - return request; - } - - public HttpResponse getResponse() { - return response; - } - - } @Override public String toString() {
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReader.java Tue Nov 28 11:46:55 2017 +0100 @@ -9,8 +9,11 @@ import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpResponse; import com.passus.net.http.HttpStatus; +import com.passus.st.client.http.HttpReqResp; +import com.passus.st.client.http.HttpSessionPayloadEvent; import static com.passus.st.reader.nc.NcHttpDataUtils.CUSTOM_HEADER_CODE; import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_REQUEST; +import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_RESPONSE; import java.io.IOException; /** @@ -23,8 +26,8 @@ private final DataHelper dataHelper = DataHelper.BIG_ENDIAN; - private ByteString decodeVersion(ByteBuff buff) throws IOException { - byte b = buff.read(); + public ByteString decodeVersion(ByteBuff buffer) throws IOException { + byte b = buffer.read(); if (b == NcHttpDataUtils.VERSION_1_0) { return HttpConsts.VERSION_1_0; } else if (b == NcHttpDataUtils.VERSION_1_1) { @@ -34,7 +37,7 @@ throw new IOException("Not supported HTTP version '" + b + "'."); } - private HttpHeaders decodeHeaders(ByteBuff buffer) throws IOException { + public HttpHeaders decodeHeaders(ByteBuff buffer) throws IOException { long headerSize = ncDataHelper.readLongVLC(buffer); int startIndex = buffer.startIndex(); @@ -62,6 +65,19 @@ return headers; } + public HttpRequest decodeRequest(ByteBuff buffer) throws IOException { + ByteString method = ncDataHelper.readByteStringNullTerminated(buffer); + ByteString uri = ncDataHelper.readByteStringNullTerminated(buffer); + return new HttpRequest(uri, method); + } + + public HttpResponse decodeResponse(ByteBuff buffer) throws IOException { + int statusCode = dataHelper.readInt2(buffer); + ByteString reasonPhrase = ncDataHelper.readByteStringNullTerminated(buffer); + HttpStatus status = new HttpStatus(statusCode, reasonPhrase); + return new HttpResponse(status); + } + public HttpMessage decodeMessage(ByteBuff buffer) throws IOException { byte flags = buffer.read(); @@ -84,7 +100,23 @@ return msg; } - private HttpMessage read(NcDataBlockReader reader) throws IOException { + public HttpReqResp decodeMessages(ByteBuff buffer) throws IOException { + byte flags = buffer.read(); + HttpRequest req = null; + if ((flags & FLAG_REQUEST) != 0) { + + } + + HttpResponse resp = null; + if ((flags & FLAG_RESPONSE) != 0) { + + } + + return new HttpReqResp(req, resp); + } + + public HttpSessionPayloadEvent read(NcDataBlockReader reader) throws IOException { + return null; }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java Tue Nov 28 11:46:55 2017 +0100 @@ -10,9 +10,11 @@ import com.passus.net.http.HttpMessage; import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpResponse; +import com.passus.st.client.http.HttpReqResp; import com.passus.st.client.http.HttpSessionPayloadEvent; import com.passus.st.emitter.SessionInfo; import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_REQUEST; +import static com.passus.st.reader.nc.NcHttpDataUtils.FLAG_RESPONSE; import java.io.IOException; import java.nio.ByteOrder; @@ -32,21 +34,21 @@ int code = NcHttpDataUtils.headerToCode(entry.getName()); if (code == -1) { headersBuff.append(NcHttpDataUtils.CUSTOM_HEADER_CODE); - ncDataHelper.writeByteStringNullTerminated(entry.getName(), headersBuff); + ncDataHelper.writeByteStringNullTerminated(headersBuff, entry.getName()); } else { headersBuff.append((byte) code); } - ncDataHelper.writeByteStringNullTerminated(entry.getValue(), headersBuff); + ncDataHelper.writeByteStringNullTerminated(headersBuff, entry.getValue()); } int len = headersBuff.readableBytes(); - len += ncDataHelper.writeLongVLC(headersBuff.readableBytes(), buff); + len += ncDataHelper.writeLongVLC(buff, headersBuff.readableBytes()); buff.append(headersBuff); return len; } - public void encodeVersion(HttpMessage msg, ByteBuff buff) throws IOException { + public long encodeVersion(HttpMessage msg, ByteBuff buff) throws IOException { if (HttpConsts.VERSION_1_0.equals(msg.getVersion())) { buff.append(NcHttpDataUtils.VERSION_1_0); } else if (HttpConsts.VERSION_1_1.equals(msg.getVersion())) { @@ -54,36 +56,55 @@ } else { throw new IOException("Not supported HTTP version '" + msg.getVersion() + "'."); } + + return 1; + } + + public long encodeRequest(HttpRequest req, ByteBuff buff) throws IOException { + long size = 0; + size += ncDataHelper.writeByteStringNullTerminated(buff, req.getMethod().toByteString()); + size += ncDataHelper.writeByteStringNullTerminated(buff, req.getUri()); + size += encodeVersion(req, buff); + size += encodeHeaders(req.getHeaders(), buff); + return size; + } + + public long encodeResponse(HttpResponse resp, ByteBuff buff) throws IOException { + long size = 0; + dataHelper.writeInt2(buff, resp.getStatus().getCode()); + size += 2; + size += ncDataHelper.writeByteStringNullTerminated(buff, resp.getStatus().getReasonPhrase()); + size += encodeVersion(resp, buff); + size += encodeHeaders(resp.getHeaders(), buff); + return size; } public long encodeMessage(HttpMessage msg, ByteBuff buff) throws IOException { + return encodeMessage(msg, buff, true); + } + + public long encodeMessage(HttpMessage msg, ByteBuff buff, boolean appendFlags) throws IOException { long size = 0; - byte flags = 0; if (msg.isRequest()) { - flags = FLAG_REQUEST; HttpRequest req = (HttpRequest) msg; - buff.append(flags); - size++; - size += ncDataHelper.writeByteStringNullTerminated(req.getMethod().toByteString(), buff); - size += ncDataHelper.writeByteStringNullTerminated(req.getUri(), buff); + if (appendFlags) { + buff.append(FLAG_REQUEST); + size++; + } - encodeVersion(msg, buff); - size++; + size += encodeRequest(req, buff); } else { HttpResponse resp = (HttpResponse) msg; - buff.append(flags); - size++; - dataHelper.writeInt2(buff, resp.getStatus().getCode()); - size += 2; - size += ncDataHelper.writeByteStringNullTerminated(resp.getStatus().getReasonPhrase(), buff); + if (appendFlags) { + buff.append(FLAG_RESPONSE); + size++; + } - encodeVersion(msg, buff); - size++; + size += encodeResponse(resp, buff); } - size += encodeHeaders(msg.getHeaders(), buff); return size; } @@ -93,6 +114,7 @@ DataSource content = msg.getContent(); if (content != null) { size += content.available(); + size += ncDataHelper.writeLongVLC(buffer, content.available()); } writer.writeSessionPayloadHeader(timestamp, session, (byte) 1, size); @@ -104,6 +126,50 @@ writer.closeSessionPayloadBlock(); } + public void encodeFullMessages(long timestamp, SessionInfo session, HttpReqResp messages, NcDataBlockWriter writer) throws IOException { + ByteBuff reqBuffer = new HeapByteBuff(); + ByteBuff respBuffer = new HeapByteBuff(); + byte flags = 0; + long size = 1; + HttpRequest req = messages.getRequest(); + DataSource reqContent = null; + if (req != null) { + flags |= FLAG_REQUEST; + size += encodeRequest(req, reqBuffer); + reqContent = req.getContent(); + if (reqContent != null) { + size += reqContent.available(); + size += ncDataHelper.writeLongVLC(reqBuffer, reqContent.available()); + } + } + + HttpResponse resp = messages.getResponse(); + DataSource respContent = null; + if (resp != null) { + flags |= FLAG_RESPONSE; + size += encodeResponse(resp, respBuffer); + respContent = resp.getContent(); + if (respContent != null) { + size += respContent.available(); + size += ncDataHelper.writeLongVLC(respBuffer, respContent.available()); + } + } + + writer.writeSessionPayloadHeader(timestamp, session, (byte) 1, size); + writer.writeSessionPayloadData(new byte[]{flags}); + writer.writeSessionPayloadData(reqBuffer); + if (reqContent != null) { + writer.writeSessionPayloadData(reqContent); + } + + writer.writeSessionPayloadData(respBuffer); + if (respContent != null) { + writer.writeSessionPayloadData(respContent); + } + + size++; + } + public void write(HttpSessionPayloadEvent event, NcDataBlockWriter writer) throws IOException { long time = event.getTimestamp(); SessionInfo session = event.getSessionInfo();
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlock.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlock.java Tue Nov 28 11:46:55 2017 +0100 @@ -6,6 +6,8 @@ */ public interface NcDataBlock { + public static final int HEADER_SIZE = 1; + public byte type(); }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java Tue Nov 28 11:46:55 2017 +0100 @@ -2,7 +2,6 @@ import com.passus.commons.Assert; import com.passus.data.ByteBuff; -import com.passus.data.DataHelper; import com.passus.data.HeapByteBuff; import com.passus.net.IpAddress; import com.passus.net.session.SessionBean; @@ -32,8 +31,6 @@ private NcHeader header; - private final DataHelper dataHelper = DataHelper.BIG_ENDIAN; - private final NcDataHelper ncDataHelper = NcDataHelper.getInstance(); private final Path path; @@ -216,9 +213,9 @@ checkBlockType(blockType, NcSegmentBlock.TYPE); } - long totalSize = dataHelper.readLong8(buffer); - long prevTotalSize = dataHelper.readLong8(buffer); - int blocksNum = dataHelper.readInt2(buffer); + long totalSize = ncDataHelper.readLong8(buffer); + long prevTotalSize = ncDataHelper.readLong8(buffer); + int blocksNum = ncDataHelper.readInt2(buffer); return new NcSegmentBlock(totalSize, prevTotalSize, blocksNum); } @@ -230,8 +227,8 @@ checkBlockType(blockType, NcSessionInfoBlock.TYPE); } - int totalSize = dataHelper.readInt4(buffer); - int sessionId = dataHelper.readInt4(buffer); + int totalSize = ncDataHelper.readInt4(buffer); + int sessionId = ncDataHelper.readInt4(buffer); String sourceName = ncDataHelper.readStringNullTerminated(buffer); byte transport = buffer.read(); if (transport != SessionBean.PROTOCOL_TCP && transport != SessionBean.PROTOCOL_UDP) { @@ -239,9 +236,9 @@ } IpAddress clientIp = ncDataHelper.readIpAddress(buffer); - int clientPort = dataHelper.readInt2(buffer); + int clientPort = ncDataHelper.readInt2(buffer); IpAddress serverIp = ncDataHelper.readIpAddress(buffer); - int serverPort = dataHelper.readInt2(buffer); + int serverPort = ncDataHelper.readInt2(buffer); SessionInfo sessionInfo = new SessionInfo(clientIp, clientPort, serverIp, serverPort, transport); sessionInfo.setSourceName(sourceName); @@ -260,8 +257,8 @@ checkBlockType(blockType, NcSessionStatusBlock.TYPE); } - long timestamp = dataHelper.readLong8(buffer); - int sessionId = dataHelper.readInt4(buffer); + long timestamp = ncDataHelper.readLong8(buffer); + int sessionId = ncDataHelper.readInt4(buffer); SessionInfo session = sessionIdMap.get(sessionId); if (session == null) { throw new IOException("Invalid session id '" + sessionId + "'."); @@ -272,25 +269,25 @@ } private NcSessionPayloadBlock readSessionPayloadBlockHeader(boolean skipBlockType) throws IOException { - read(NcSessionPayloadBlock.MAX_HEADER_SIZE); + read(NcSessionPayloadBlock.HEADER_SIZE); if (!skipBlockType) { byte blockType = buffer.read(); checkBlockType(blockType, NcSessionPayloadBlock.TYPE); } - long timestamp = dataHelper.readLong8(buffer); - int sessionId = dataHelper.readInt4(buffer); + long timestamp = ncDataHelper.readLong8(buffer); + int sessionId = ncDataHelper.readInt4(buffer); SessionInfo session = sessionIdMap.get(sessionId); if (session == null) { throw new IOException("Invalid session id '" + sessionId + "'."); } - //long totalSize = dataHelper.readLong8(buffer); - int proto = dataHelper.readInt2(buffer); - long dataLength = ncDataHelper.readLongVLC(buffer); - long optionsLength = ncDataHelper.readLongVLC(buffer); - return new NcSessionPayloadBlock(timestamp, sessionId, session, -1, proto, dataLength, optionsLength, null, null); + long totalSize = ncDataHelper.readLong6(buffer); + long dataSize = ncDataHelper.readLong6(buffer); + int proto = ncDataHelper.readInt2(buffer); + + return new NcSessionPayloadBlock(timestamp, sessionId, session, totalSize, dataSize, proto, null, null); } private int readSessionPayloadBlockContent(ByteBuff data, int length) throws IOException { @@ -311,21 +308,21 @@ NcSessionPayloadBlock payloadBlock = readSessionPayloadBlockHeader(skipBlockType); if (!skipData) { - if (payloadBlock.dataLength() <= Integer.MAX_VALUE) { - int dataLength = (int) payloadBlock.dataLength(); - ByteBuff data = new HeapByteBuff(dataLength); - readSessionPayloadBlockContent(data, dataLength); + if (payloadBlock.dataSize() <= Integer.MAX_VALUE) { + int dataSize = (int) payloadBlock.dataSize(); + ByteBuff data = new HeapByteBuff(dataSize); + readSessionPayloadBlockContent(data, dataSize); payloadBlock.data(data); } else { long startPosition = position(); - long endPosition = startPosition + payloadBlock.dataLength(); + long endPosition = startPosition + payloadBlock.dataSize(); ReadonlyFileChannelDataSource ds = new ReadonlyFileChannelDataSource(ch, path, startPosition, endPosition); payloadBlock.data(ds); } } Map<String, Object> options = null; - if (!skipOptions && payloadBlock.optionsLength() != 0) { + if (!skipOptions && payloadBlock.optionsSize() != 0) { throw new RuntimeException("optionsLength != 0 - not implemented yet"); } @@ -341,8 +338,7 @@ } public int readSessionPayloadContent(ByteBuffer data, long length) { - - return 0; + throw new RuntimeException("Not implemented yet."); } @Override
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Tue Nov 28 11:46:55 2017 +0100 @@ -3,6 +3,7 @@ import com.passus.commons.Assert; import com.passus.data.ByteBuff; import com.passus.data.ByteBuffDataSource; +import com.passus.data.DataHelper; import com.passus.data.DataSource; import com.passus.st.emitter.SessionInfo; import com.passus.st.reader.DataBlockWriter; @@ -283,11 +284,11 @@ int sizePosition = buffer.position(); buffer.putInt(0); buffer.putInt(sessionId); - size += ncDataHelper.writeStringNullTerminated(sessionInfo.getSourceName(), buffer); + size += ncDataHelper.writeStringNullTerminated(buffer, sessionInfo.getSourceName()); buffer.put((byte) sessionInfo.getTransport()); - size += ncDataHelper.writeIpAddress(sessionInfo.getSrcIp(), buffer); + size += ncDataHelper.writeIpAddress(buffer, sessionInfo.getSrcIp()); buffer.putShort((short) sessionInfo.getSrcPort()); - size += ncDataHelper.writeIpAddress(sessionInfo.getDstIp(), buffer); + size += ncDataHelper.writeIpAddress(buffer, sessionInfo.getDstIp()); buffer.putShort((short) sessionInfo.getDstPort()); closeBlock(NcSessionInfoBlock.TYPE, false); buffer.putInt(sizePosition, size); @@ -313,7 +314,7 @@ updateSegmentInfo(NcSessionStatusBlock.HEADER_SIZE); } - public void writeSessionPayloadHeader(long timestamp, SessionInfo session, int proto, long dataLength) throws IOException { + public void writeSessionPayloadHeader(long timestamp, SessionInfo session, int proto, long dataSize) throws IOException { checkOpened(); int sessionId = getSessionId(session, opened); @@ -322,16 +323,11 @@ buffer.putLong(timestamp); buffer.putInt(sessionId); - //int totalSizePosition = buffer.position(); - //buffer.putLong(0L); + ncDataHelper.writeLong6(buffer, NcSessionPayloadBlock.HEADER_SIZE + dataSize); + ncDataHelper.writeLong6(buffer, dataSize); buffer.putShort((short) proto); - int size = 18; - size += ncDataHelper.writeLongVLC(dataLength, buffer); - size += ncDataHelper.writeLongVLC(0, buffer); // OptionsLength - //buffer.putLong(totalSizePosition, size); - - updateSegmentInfo(size); + updateSegmentInfo(NcSessionPayloadBlock.HEADER_SIZE); writeBuffer(); currentBlockStage = BLOCK_STAGE_CONTENT_WRITE; }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataHelper.java Tue Nov 28 11:46:55 2017 +0100 @@ -4,6 +4,7 @@ import com.passus.data.ByteBuff; import com.passus.data.ByteBuffUtils; import com.passus.data.ByteString; +import com.passus.data.DataHelper.BigEndianDataHelper; import com.passus.data.DataUtils; import com.passus.net.Ip4Address; import com.passus.net.Ip6Address; @@ -17,7 +18,7 @@ * * @author Mirosław Hawrot */ -public class NcDataHelper { +public class NcDataHelper extends BigEndianDataHelper { public static final byte VERSION_MAJOR = 1; @@ -98,27 +99,27 @@ return readed; } - public int writeIntVLC(int value, ByteBuff buffer) { - int len = writeIntVLC(value, buffer.buffer(), buffer.endIndex(), buffer.capacity()); + public int writeIntVLC(ByteBuff buffer, int value) { + int len = writeIntVLC(buffer.buffer(), buffer.endIndex(), buffer.capacity(), value); buffer.endIndex(buffer.endIndex() + len); return len; } - public int writeIntVLC(int value, ByteBuffer buffer) { - int len = writeIntVLC(value, buffer.array(), buffer.position(), buffer.limit()); + public int writeIntVLC(ByteBuffer buffer, int value) { + int len = writeIntVLC(buffer.array(), buffer.position(), buffer.limit(), value); buffer.position(buffer.position() + len); return len; } - public int writeIntVLC(int value, byte[] data) { - return writeIntVLC(value, data, 0, data.length); + public int writeIntVLC(byte[] data, int value) { + return writeIntVLC(data, 0, data.length, value); } - public int writeIntVLC(int value, byte[] data, int startIndex) { - return writeIntVLC(value, data, startIndex, data.length); + public int writeIntVLC(byte[] data, int startIndex, int value) { + return writeIntVLC(data, startIndex, data.length, value); } - public int writeIntVLC(int value, byte[] data, int startIndex, int endIndex) { + public int writeIntVLC(byte[] data, int startIndex, int endIndex, int value) { if (value < VLC_INT_TRESHOLD) { data[startIndex] = (byte) value; return 1; @@ -169,27 +170,36 @@ return readed; } - public int writeLongVLC(long value, ByteBuff buffer) { - int len = writeLongVLC(value, buffer.buffer(), buffer.endIndex(), buffer.capacity()); + public void writeLong6(ByteBuffer buffer, long val) { + writeLongN(buffer, 6, val); + } + + public void writeLongN(ByteBuffer buffer, int bytes, long val) { + writeLongN(buffer.array(), buffer.position(), bytes, val); + buffer.position(buffer.position() + bytes); + } + + public int writeLongVLC(ByteBuff buffer, long value) { + int len = writeLongVLC(buffer.buffer(), buffer.endIndex(), buffer.capacity(), value); buffer.endIndex(buffer.endIndex() + len); return len; } - public int writeLongVLC(long value, ByteBuffer buffer) { - int len = writeLongVLC(value, buffer.array(), buffer.position(), buffer.limit()); + public int writeLongVLC(ByteBuffer buffer, long value) { + int len = writeLongVLC(buffer.array(), buffer.position(), buffer.limit(), value); buffer.position(buffer.position() + len); return len; } - public int writeLongVLC(long value, byte[] data) { - return writeLongVLC(value, data, 0, data.length); + public int writeLongVLC(byte[] data, long value) { + return writeLongVLC(data, 0, data.length, value); } - public int writeLongVLC(long value, byte[] data, int startIndex) { - return writeLongVLC(value, data, startIndex, data.length); + public int writeLongVLC(byte[] data, int startIndex, long value) { + return writeLongVLC(data, startIndex, data.length, value); } - public int writeLongVLC(long value, byte[] data, int startIndex, int endIndex) { + public int writeLongVLC(byte[] data, int startIndex, int endIndex, long value) { if (value < VLC_LONG_TRESHOLD) { if (endIndex - startIndex < 2) { throw new IndexOutOfBoundsException(); @@ -211,28 +221,28 @@ } } - public int writeByteStringNullTerminated(ByteString value, ByteBuff buffer) { - return writeByteStringNullTerminated(value, buffer, true); + public int writeByteStringNullTerminated(ByteBuff buffer, ByteString value) { + return writeByteStringNullTerminated(buffer, value, true); } - public int writeByteStringNullTerminated(ByteString value, ByteBuff buffer, boolean sanitize) { + public int writeByteStringNullTerminated(ByteBuff buffer, ByteString value, boolean sanitize) { buffer.ensureCapacity(value.length()); - int len = writeByteStringNullTerminated(value, buffer.buffer(), buffer.endIndex(), buffer.capacity(), sanitize); + int len = writeByteStringNullTerminated(buffer.buffer(), buffer.endIndex(), buffer.capacity(), value, sanitize); buffer.endIndex(buffer.endIndex() + len); return len; } - public int writeByteStringNullTerminated(ByteString value, ByteBuffer buffer) { - return writeByteStringNullTerminated(value, buffer, true); + public int writeByteStringNullTerminated(ByteBuffer buffer, ByteString value) { + return writeByteStringNullTerminated(buffer, value, true); } - public int writeByteStringNullTerminated(ByteString value, ByteBuffer buffer, boolean sanitize) { - int len = writeByteStringNullTerminated(value, buffer.array(), buffer.position(), buffer.limit(), sanitize); + public int writeByteStringNullTerminated(ByteBuffer buffer, ByteString value, boolean sanitize) { + int len = writeByteStringNullTerminated(buffer.array(), buffer.position(), buffer.limit(), value, sanitize); buffer.limit(buffer.limit() + len); return len; } - public int writeByteStringNullTerminated(ByteString value, byte data[], int startIndex, int endIndex, boolean sanitize) { + public int writeByteStringNullTerminated(byte data[], int startIndex, int endIndex, ByteString value, boolean sanitize) { int len = 0; byte[] valData = value.getBytes(); if (!value.isEmpty()) { @@ -268,7 +278,7 @@ throw new IllegalArgumentException("Unable to find NULL delimiter."); } - public int writeStringNullTerminated(String value, ByteBuff buffer) { + public int writeStringNullTerminated(ByteBuff buffer, String value) { if (value != null && !value.isEmpty()) { byte[] data = value.getBytes(CHARSET); buffer.append(data); @@ -280,7 +290,7 @@ return 1; } - public int writeStringNullTerminated(String value, ByteBuffer buffer) { + public int writeStringNullTerminated(ByteBuffer buffer, String value) { if (value != null && !value.isEmpty()) { byte[] data = value.getBytes(CHARSET); buffer.put(data); @@ -292,7 +302,7 @@ return 1; } - public int writeIpAddress(IpAddress ip, ByteBuffer buffer) { + public int writeIpAddress(ByteBuffer buffer, IpAddress ip) { byte[] data = ip.getAddress(); buffer.put((byte) ip.getVersion()); buffer.put(data);
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpDataUtils.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpDataUtils.java Tue Nov 28 11:46:55 2017 +0100 @@ -14,14 +14,16 @@ public static final byte FLAG_REQUEST = (byte) 0b10000000; - public static final byte FLAG_ENCODED = (byte) 0b01000000; + public static final byte FLAG_RESPONSE = (byte) 0b01000000; - public static final byte FLAG_COMPRESSED = (byte) 0b00100000; + public static final byte FLAG_ENCODED = (byte) 0b00100000; + + public static final byte FLAG_COMPRESSED = (byte) 0b00010000; public static final byte VERSION_1_0 = 1; - + public static final byte VERSION_1_1 = 2; - + public static final int CUSTOM_HEADER_CODE = 0; private static final Map<Byte, ByteString> CODE_TO_HEADER;
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionBlock.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionBlock.java Tue Nov 28 11:46:55 2017 +0100 @@ -8,7 +8,7 @@ */ public abstract class NcSessionBlock implements NcDataBlock { - public static final int HEADER_SIZE = 12; + public static final int HEADER_SIZE = 12 + NcDataBlock.HEADER_SIZE; private long timestamp;
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionPayloadBlock.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcSessionPayloadBlock.java Tue Nov 28 11:46:55 2017 +0100 @@ -11,15 +11,15 @@ public static final byte TYPE = 3; - public static final int MAX_HEADER_SIZE = NcSessionBlock.HEADER_SIZE + 29; + public static final int HEADER_SIZE = NcSessionBlock.HEADER_SIZE + 14; private long totalSize; - private int proto; + private long dataSize; - private long dataLength; + private long optionsSize; - private long optionsLength = 0; + private int proto; private Object data; @@ -29,13 +29,13 @@ } public NcSessionPayloadBlock(long timestamp, int sessionId, SessionInfo sessionInfo, - long totalSize, int proto, long dataLength, long optionsLength, + long totalSize, long dataSize, int proto, Object data, Map<String, Object> options) { super(timestamp, sessionId, sessionInfo); this.totalSize = totalSize; + this.dataSize = dataSize; + this.optionsSize = totalSize - HEADER_SIZE - dataSize; this.proto = proto; - this.dataLength = dataLength; - this.optionsLength = optionsLength; this.options = options; this.data = data; } @@ -61,20 +61,16 @@ this.proto = proto; } - public long dataLength() { - return dataLength; + public long dataSize() { + return dataSize; } - public void dataLength(long dataLength) { - this.dataLength = dataLength; + public void dataSize(long dataLength) { + this.dataSize = dataLength; } - public long optionsLength() { - return optionsLength; - } - - public void optionsLength(long optionsLength) { - this.optionsLength = optionsLength; + public long optionsSize() { + return optionsSize; } public Object data() {
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/option/OptionsReaderWriter.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/option/OptionsReaderWriter.java Tue Nov 28 11:46:55 2017 +0100 @@ -14,7 +14,7 @@ public static NcDataHelper NC_HELPER = NcDataHelper.getInstance(); public void writeOptions(List<Option> options, ByteBuff buff) { - NC_HELPER.writeIntVLC(options.size(), buff); + NC_HELPER.writeIntVLC(buff, options.size()); for (Option option : options) { String name = option.getName(); @@ -26,7 +26,7 @@ buff.append(code); if (code == 0) { - NC_HELPER.writeStringNullTerminated(name, buff); + NC_HELPER.writeStringNullTerminated(buff, name); } coder.encode(option.getValue(), buff); }
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/option/ValueCoders.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/option/ValueCoders.java Tue Nov 28 11:46:55 2017 +0100 @@ -31,7 +31,7 @@ @Override public void encode(Integer value, ByteBuff buff) { - NC_HELPER.writeIntVLC(value, buff); + NC_HELPER.writeIntVLC(buff, value); } @Override @@ -57,7 +57,7 @@ @Override public void encode(Long value, ByteBuff buff) { - NC_HELPER.writeLongVLC(value, buff); + NC_HELPER.writeLongVLC(buff, value); } @Override @@ -70,7 +70,7 @@ @Override public void encode(ByteString value, ByteBuff buff) { - NC_HELPER.writeByteStringNullTerminated(value, buff); + NC_HELPER.writeByteStringNullTerminated(buff, value); } @Override @@ -83,7 +83,7 @@ @Override public void encode(String value, ByteBuff buff) { - NC_HELPER.writeStringNullTerminated(value, buff); + NC_HELPER.writeStringNullTerminated(buff, value); } @Override
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/option/VarOptionCoder.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/option/VarOptionCoder.java Tue Nov 28 11:46:55 2017 +0100 @@ -13,9 +13,9 @@ @Override public void encode(VarOption value, ByteBuff buff) { - NC_HELPER.writeStringNullTerminated(value.getName(), buff); - NC_HELPER.writeLongVLC(value.getStartOffset(), buff); - NC_HELPER.writeLongVLC(value.getEndOffset(), buff); + NC_HELPER.writeStringNullTerminated(buff, value.getName()); + NC_HELPER.writeLongVLC(buff, value.getStartOffset()); + NC_HELPER.writeLongVLC(buff, value.getEndOffset()); } @Override
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java Tue Nov 28 11:46:55 2017 +0100 @@ -5,8 +5,6 @@ import com.passus.commons.service.ServiceException; import com.passus.data.ByteBuff; import com.passus.data.ByteBuffDataSource; -import com.passus.net.http.HttpRequest; -import com.passus.net.session.SessionKey; import com.passus.st.client.EventHandler; import com.passus.st.client.SessionStatusEvent; import com.passus.st.plugin.PluginConstants; @@ -15,7 +13,8 @@ import com.passus.st.reader.nc.NcSessionPayloadBlock; import com.passus.st.reader.nc.NcSessionStatusBlock; import java.io.IOException; -import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * @@ -24,6 +23,8 @@ @Plugin(name = NcEventSource.TYPE, category = PluginConstants.CATEGORY_EVENT_SOURCE) public class NcEventSource implements EventSource { + private static final Logger LOGGER = LogManager.getLogger(NcEventSource.class); + public static final String TYPE = "nc"; private String ncFile; @@ -38,6 +39,8 @@ private String name; + private ReaderThread readerThread; + @Override public String getType() { return TYPE; @@ -100,6 +103,16 @@ } } + if (readerThread != null) { + readerThread.working = false; + readerThread.interrupt(); + + try { + readerThread.join(5_000); + } catch (InterruptedException ignore) { + } + } + started = false; } @@ -112,26 +125,40 @@ } } - private void read() throws IOException { - while (!reader.eof()) { - byte blockType = reader.peekBlockType(); - switch (blockType) { - case NcSessionStatusBlock.TYPE: - NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) reader.read(); - SessionStatusEvent event = new SessionStatusEvent(statusBlock.sessionInfo(), statusBlock.status()); - handler.handle(event); - break; - case NcSessionPayloadBlock.TYPE: - NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) reader.read(); - ByteBuff payload = readPayload(payloadBlock.data()); + private class ReaderThread extends Thread { - /*HttpMessage msg = httpReader.readMessage(payload); + private boolean working = false; + + @Override + public void run() { + try { + working = true; + while (working && !reader.eof()) { + byte blockType = reader.peekBlockType(); + switch (blockType) { + case NcSessionStatusBlock.TYPE: + NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) reader.read(); + SessionStatusEvent event = new SessionStatusEvent(statusBlock.sessionInfo(), statusBlock.status()); + handler.handle(event); + break; + case NcSessionPayloadBlock.TYPE: + NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) reader.read(); + ByteBuff payload = readPayload(payloadBlock.data()); + + /*HttpMessage msg = httpReader.readMessage(payload); handler.handle(event);*/ - break; - default: - reader.read(); + break; + default: + reader.read(); + } + } + } catch (IOException e) { + LOGGER.debug(e.getMessage(), e); } + + working = false; } + } }
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReaderTest.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataReaderTest.java Tue Nov 28 11:46:55 2017 +0100 @@ -23,7 +23,7 @@ } @Test - public void testDecode_Request() throws Exception { + public void testDecodeMessage_Request() throws Exception { ByteBuff buffer = new HeapByteBuff(); HttpRequest req = HttpRequestBuilder.get("http://test.com/test") .header("X-Header", "X-Header-Value") @@ -36,7 +36,7 @@ } @Test - public void testDecode_Response() throws Exception { + public void testDecodeMessage_Response() throws Exception { ByteBuff buffer = new HeapByteBuff(); HttpResponse resp = HttpResponseBuilder.ok() .header("X-Header", "X-Header-Value")
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java Tue Nov 28 11:46:55 2017 +0100 @@ -10,12 +10,7 @@ import java.io.IOException; import java.util.UUID; import org.apache.commons.io.FileUtils; -import static org.mockito.Mockito.mock; import static org.testng.Assert.*; -import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; /**
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java Tue Nov 28 11:46:55 2017 +0100 @@ -70,8 +70,8 @@ assertEquals(NcSessionPayloadBlock.TYPE, block.type()); NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) block; assertSame(sessionInfoBlock.sessionInfo(), payloadBlock.sessionInfo()); - assertEquals(data.length, payloadBlock.dataLength()); - assertEquals(0, payloadBlock.optionsLength()); + assertEquals(data.length, payloadBlock.dataSize()); + assertEquals(0, payloadBlock.optionsSize()); assertTrue(payloadBlock.data() instanceof ByteBuff); ByteBuff payload = (ByteBuff) payloadBlock.data(); assertEquals(data, payload.toArray());
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataHelperTest.java Tue Nov 28 11:08:29 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataHelperTest.java Tue Nov 28 11:46:55 2017 +0100 @@ -73,7 +73,7 @@ @Test(dataProvider = "validIntVLC") public void testWriteIntVLC(byte[] expectedResult, int expectedBytes, int length) { byte[] result = new byte[expectedResult.length]; - helper.writeIntVLC(length, result); + helper.writeIntVLC(result, length); assertEquals(expectedResult, result); } @@ -102,7 +102,7 @@ @Test(dataProvider = "validLongVLC") public void testWriteLongVLC(byte[] expectedResult, int expectedBytes, long length) { byte[] result = new byte[expectedResult.length]; - helper.writeLongVLC(length, result); + helper.writeLongVLC(result, length); assertEquals(expectedResult, result); } @@ -110,19 +110,19 @@ public void testWriteStringNullTerminated() { ByteBuffer buffer = ByteBuffer.allocate(256); - helper.writeStringNullTerminated("", buffer); + helper.writeStringNullTerminated(buffer, ""); buffer.flip(); assertEquals(1, buffer.remaining()); assertEquals(AsciiUtils.NUL, buffer.get()); buffer.clear(); - helper.writeStringNullTerminated(null, buffer); + helper.writeStringNullTerminated(buffer, null); buffer.flip(); assertEquals(1, buffer.remaining()); assertEquals(AsciiUtils.NUL, buffer.get()); buffer.clear(); - helper.writeStringNullTerminated("123", buffer); + helper.writeStringNullTerminated(buffer, "123"); buffer.flip(); assertEquals(4, buffer.remaining()); assertEquals(0x31, buffer.get());