Mercurial > stress-tester
changeset 698:f248f2e25d10
NC file in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java Mon Nov 27 13:33:13 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriter.java Mon Nov 27 14:23:02 2017 +0100 @@ -87,7 +87,7 @@ return size; } - private void encodeFullMessage(long timestamp, SessionInfo session, HttpMessage msg, NcDataBlockWriter writer) throws IOException { + public void encodeFullMessage(long timestamp, SessionInfo session, HttpMessage msg, NcDataBlockWriter writer) throws IOException { ByteBuff buffer = new HeapByteBuff(); long size = encodeMessage(msg, buffer); DataSource content = msg.getContent();
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java Mon Nov 27 13:33:13 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java Mon Nov 27 14:23:02 2017 +0100 @@ -348,11 +348,11 @@ @Override public NcDataBlock read() throws IOException { checkOpened(); + read(1); if (eof && buffer.isEmpty()) { return null; } - read(1); byte blockType = buffer.read(); switch (blockType) { case NcSegmentBlock.TYPE:
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Mon Nov 27 13:33:13 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Mon Nov 27 14:23:02 2017 +0100 @@ -374,7 +374,7 @@ ByteBuffer dataBuffer = wrapData(data); int dataSize = dataBuffer.remaining(); - write(dataBuffer); + ch.write(dataBuffer); updateSegmentInfo(dataSize); }
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java Mon Nov 27 13:33:13 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java Mon Nov 27 14:23:02 2017 +0100 @@ -12,6 +12,7 @@ import com.passus.st.client.http.HttpSessionPayloadEvent; import com.passus.st.reader.nc.HttpSessionPayloadEventDataWriter; import com.passus.st.reader.nc.NcDataBlockWriter; +import java.io.File; import java.io.IOException; /** @@ -30,15 +31,42 @@ private final HttpResponseEncoder responseEncoder = HttpResponseEncoder.getInstance(); - private HttpSessionPayloadEventDataWriter httpPayloadWriter = new HttpSessionPayloadEventDataWriter(); + private final HttpSessionPayloadEventDataWriter httpPayloadWriter = new HttpSessionPayloadEventDataWriter(); - private boolean encodeData; + private boolean encodeData = true; + + public NcEventDestination() { + } + + public NcEventDestination(String ncFile) { + this.ncFile = ncFile; + } + + public NcEventDestination(File file) { + this(file.getAbsolutePath()); + } + + public String getNcFile() { + return ncFile; + } + + public void setNcFile(String ncFile) { + this.ncFile = ncFile; + } @Override public boolean isStarted() { return started; } + public boolean isEncodeData() { + return encodeData; + } + + public void setEncodeData(boolean encodeData) { + this.encodeData = encodeData; + } + @Override public void start() { if (started) { @@ -117,17 +145,12 @@ case HttpSessionPayloadEvent.TYPE: HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event; - if (payloadEvent.getRequest() != null) { - ByteBuff buff = new HeapByteBuff(); - requestEncoder.encode(payloadEvent.getRequest(), buff); - writer.writeSessionPayload(payloadEvent.getTimestamp(), payloadEvent.getSessionInfo(), (byte) 1, buff); + if (encodeData) { + writeEncoded(payloadEvent); + } else { + writeNotEncoded(payloadEvent); } - if (payloadEvent.getResponse() != null) { - ByteBuff buff = new HeapByteBuff(); - responseEncoder.encode(payloadEvent.getResponse(), buff); - writer.writeSessionPayload(payloadEvent.getTimestamp(), payloadEvent.getSessionInfo(), (byte) 1, buff); - } break; case DataEnd.TYPE: stop();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java Mon Nov 27 14:23:02 2017 +0100 @@ -0,0 +1,137 @@ +package com.passus.st.source; + +import com.passus.commons.Assert; +import com.passus.commons.annotations.Plugin; +import com.passus.commons.service.ServiceException; +import com.passus.data.ByteBuff; +import com.passus.data.ByteBuffDataSource; +import com.passus.net.http.HttpRequest; +import com.passus.net.session.SessionKey; +import com.passus.st.client.EventHandler; +import com.passus.st.client.SessionStatusEvent; +import com.passus.st.plugin.PluginConstants; +import com.passus.st.reader.nc.HttpSessionPayloadEventDataReader; +import com.passus.st.reader.nc.NcDataBlockReader; +import com.passus.st.reader.nc.NcSessionPayloadBlock; +import com.passus.st.reader.nc.NcSessionStatusBlock; +import java.io.IOException; +import java.util.Map; + +/** + * + * @author Mirosław Hawrot + */ +@Plugin(name = NcEventSource.TYPE, category = PluginConstants.CATEGORY_EVENT_SOURCE) +public class NcEventSource implements EventSource { + + public static final String TYPE = "nc"; + + private String ncFile; + + private NcDataBlockReader reader; + + private volatile boolean started; + + private EventHandler handler; + + private HttpSessionPayloadEventDataReader httpReader; + + private String name; + + @Override + public String getType() { + return TYPE; + } + + @Override + public String getName() { + return name; + } + + @Override + public void setName(String name) { + this.name = name; + } + + @Override + public EventHandler getHandler() { + return handler; + } + + @Override + public void setHandler(EventHandler handler) { + Assert.notNull(handler, "handler"); + this.handler = handler; + } + + @Override + public boolean isStarted() { + return started; + } + + @Override + public void start() { + if (started) { + return; + } + + try { + reader = new NcDataBlockReader(ncFile); + reader.open(); + started = true; + } catch (Exception e) { + stop0(); + throw new ServiceException("Unable to start NcEventDestination. " + e.getMessage(), e); + } + } + + @Override + public void stop() { + if (started) { + stop0(); + } + } + + private void stop0() { + if (reader != null) { + try { + reader.close(); + } catch (Exception ignore) { + } + } + + started = false; + } + + private ByteBuff readPayload(Object data) { + if (data instanceof ByteBuffDataSource) { + ByteBuffDataSource ds = (ByteBuffDataSource) data; + return ds.getByteBuffer(); + } else { + throw new IllegalArgumentException("Not supported data object."); + } + } + + private void read() throws IOException { + while (!reader.eof()) { + byte blockType = reader.peekBlockType(); + switch (blockType) { + case NcSessionStatusBlock.TYPE: + NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) reader.read(); + SessionStatusEvent event = new SessionStatusEvent(statusBlock.sessionInfo(), statusBlock.status()); + handler.handle(event); + break; + case NcSessionPayloadBlock.TYPE: + NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) reader.read(); + ByteBuff payload = readPayload(payloadBlock.data()); + + /*HttpMessage msg = httpReader.readMessage(payload); + handler.handle(event);*/ + break; + default: + reader.read(); + } + } + } + +}
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java Mon Nov 27 13:33:13 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/HttpSessionPayloadEventDataWriterTest.java Mon Nov 27 14:23:02 2017 +0100 @@ -1,5 +1,7 @@ package com.passus.st.reader.nc; +import com.passus.data.HeapByteBuff; +import com.passus.net.http.HttpHeaders; import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpRequestBuilder; import com.passus.st.client.http.HttpSessionPayloadEvent; @@ -38,6 +40,18 @@ } @Test + public void testEncodeHeaders() throws Exception { + HttpHeaders headers = new HttpHeaders(); + headers.add(HttpHeaders.HOST, "test"); + headers.add("X-HEADER", "X-HEADER-VALUE"); + HeapByteBuff buffer = new HeapByteBuff(); + + HttpSessionPayloadEventDataWriter payloadWriter = new HttpSessionPayloadEventDataWriter(); + payloadWriter.encodeHeaders(headers, buffer); + assertEquals(33, buffer.readableBytes()); + } + + @Test public void testWrite_HttpRequest() throws Exception { File file = createTmpFile(); try {
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java Mon Nov 27 13:33:13 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockReaderTest.java Mon Nov 27 14:23:02 2017 +0100 @@ -23,6 +23,7 @@ public NcDataBlockReaderTest() throws ParseException { session = new SessionInfo("1.1.1.1:5000", "2.2.2.2:80"); + session.setSourceName("test"); } private File createTmpFile() throws IOException {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/source/NcEventDestinationTest.java Mon Nov 27 14:23:02 2017 +0100 @@ -0,0 +1,67 @@ +package com.passus.st.source; + +import static com.passus.commons.utils.ResourceUtils.createTmpFile; +import com.passus.st.client.Event; +import com.passus.st.client.SessionStatusEvent; +import com.passus.st.client.http.HttpSessionPayloadEvent; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.reader.nc.NcDataBlock; +import com.passus.st.reader.nc.NcSegmentBlock; +import com.passus.st.reader.nc.NcSessionInfoBlock; +import com.passus.st.reader.nc.NcSessionPayloadBlock; +import com.passus.st.reader.nc.NcSessionStatusBlock; +import com.passus.st.utils.EventUtils; +import com.passus.st.utils.NcDataBlockReaderUtils; +import java.io.File; +import java.util.List; +import java.util.Properties; +import static org.testng.AssertJUnit.assertEquals; +import org.testng.annotations.Test; + +/** + * + * @author Mirosław Hawrot + */ +public class NcEventDestinationTest { + + @Test + public void testHandle() throws Exception { + Properties props = new Properties(); + props.put("allowPartialSession", "true"); + props.put("ports", "4214"); + List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); + assertEquals(4, events.size()); + + HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) events.get(0); + SessionStatusEvent statusEvent = (SessionStatusEvent) events.get(1); + SessionInfo sessionInfo = payloadEvent.getSessionInfo(); + + File tmpFile = createTmpFile(false); + NcEventDestination eventDst = null; + try { + eventDst = new NcEventDestination(tmpFile); + eventDst.start(); + events.forEach(eventDst::handle); + eventDst.stop(); + + List<NcDataBlock> blocks = NcDataBlockReaderUtils.readAll(tmpFile); + assertEquals(6, blocks.size()); + assertEquals(NcSegmentBlock.TYPE, blocks.get(0).type()); + assertEquals(NcSessionInfoBlock.TYPE, blocks.get(1).type()); + NcSessionInfoBlock infoBlock = (NcSessionInfoBlock) blocks.get(1); + assertEquals(sessionInfo, infoBlock.sessionInfo()); + + assertEquals(NcSessionPayloadBlock.TYPE, blocks.get(2).type()); + assertEquals(NcSessionPayloadBlock.TYPE, blocks.get(3).type()); + assertEquals(NcSessionStatusBlock.TYPE, blocks.get(4).type()); + NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) blocks.get(4); + assertEquals(sessionInfo, statusBlock.sessionInfo()); + assertEquals(statusEvent.getStatus(), statusBlock.status()); + + assertEquals(NcSegmentBlock.TYPE, blocks.get(5).type()); + } finally { + tmpFile.delete(); + } + } + +}
--- a/stress-tester/src/test/java/com/passus/st/utils/HttpMessageAssert.java Mon Nov 27 13:33:13 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/utils/HttpMessageAssert.java Mon Nov 27 14:23:02 2017 +0100 @@ -1,6 +1,8 @@ package com.passus.st.utils; +import com.passus.data.ByteBuff; import com.passus.data.ByteString; +import com.passus.data.HeapByteBuff; import com.passus.net.http.HttpHeaders; import com.passus.net.http.HttpMessage; import com.passus.net.http.HttpRequest; @@ -32,6 +34,23 @@ } } + public static void assertMessagesContent(HttpMessage expectedMsg, HttpMessage msg) { + if (expectedMsg == msg) { + return; + } + + try { + ByteBuff content1 = new HeapByteBuff(); + ByteBuff content2 = new HeapByteBuff(); + expectedMsg.getContent().write(content1); + msg.getContent().write(content2); + assertEquals(content1, content2); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + + } + public static void assertRequests(HttpRequest expectedReq, HttpRequest req) { if (expectedReq == req) { return; @@ -74,4 +93,5 @@ assertEquals(values2, values1); } } + }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/utils/NcDataBlockReaderUtils.java Mon Nov 27 14:23:02 2017 +0100 @@ -0,0 +1,50 @@ +package com.passus.st.utils; + +import com.passus.st.reader.nc.NcDataBlock; +import com.passus.st.reader.nc.NcDataBlockReader; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Predicate; + +/** + * + * @author Mirosław Hawrot + */ +public class NcDataBlockReaderUtils { + + private NcDataBlockReaderUtils() { + } + + public static List<NcDataBlock> readAll(File file) throws IOException { + return readAll(file.getAbsolutePath(), null); + } + + public static List<NcDataBlock> readAll(String fileName) throws IOException { + return readAll(fileName, null); + } + + public static List<NcDataBlock> readAll(File file, Predicate<NcDataBlock> filter) throws IOException { + return readAll(file.getAbsolutePath(), filter); + } + + public static List<NcDataBlock> readAll(String fileName, Predicate<NcDataBlock> filter) throws IOException { + List<NcDataBlock> blocks = new ArrayList<>(); + try (NcDataBlockReader reader = new NcDataBlockReader(fileName)) { + reader.open(); + + NcDataBlock block; + while ((block = reader.read()) != null) { + if (filter != null && !filter.test(block)) { + continue; + } + + blocks.add(block); + } + } + + return blocks; + } + +}