Mercurial > stress-tester
changeset 763:cb51fb3a6ebd
NcEventSource.parallel option
author | Devel 2 |
---|---|
date | Fri, 15 Dec 2017 13:28:24 +0100 |
parents | 6bb60e93eb43 |
children | dc80677e532f |
files | stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java stress-tester/src/main/java/com/passus/st/source/NcEventSource.java stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java |
diffstat | 3 files changed, 56 insertions(+), 24 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java Fri Dec 15 11:30:43 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java Fri Dec 15 13:28:24 2017 +0100 @@ -140,6 +140,10 @@ } private int read(int requiredBytes) throws IOException { + return read(requiredBytes, buffer); + } + + private int read(int requiredBytes, ByteBuff buffer) throws IOException { if (eof) { return -1; }
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java Fri Dec 15 11:30:43 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java Fri Dec 15 13:28:24 2017 +0100 @@ -46,6 +46,8 @@ private ReaderThread readerThread; + private boolean parallel = true; + public NcEventSource() { } @@ -82,6 +84,14 @@ throw new UnsupportedOperationException("Not supported yet."); } + public boolean isParallel() { + return parallel; + } + + public void setParallel(boolean parallel) { + this.parallel = parallel; + } + @Override public EventHandler getHandler() { return handler; @@ -108,10 +118,15 @@ reader = new NcDataBlockReader(ncFile); reader.open(); - readerThread = new ReaderThread(); - readerThread.start(); + if (parallel) { + readerThread = new ReaderThread(); + readerThread.start(); + } started = true; + if (!parallel) { + readAll(); + } } catch (Exception e) { stop0(); throw new ServiceException("Unable to start NcEventDestination. " + e.getMessage(), e); @@ -148,6 +163,37 @@ started = false; } + private void readAll() { + try { + while (!reader.eof()) { + read(); + } + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + } + + private void read() throws IOException { + byte blockType = reader.peekBlockType(); + switch (blockType) { + case NcSessionStatusBlock.TYPE: + NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) reader.read(); + SessionStatusEvent event = new SessionStatusEvent(statusBlock.sessionInfo(), statusBlock.status()); + handler.handle(event); + break; + case NcSessionPayloadBlock.TYPE: + NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) reader.read(); + SessionInfo sessionInfo = payloadBlock.sessionInfo(); + ByteBuff payload = readPayload(payloadBlock.data()); + HttpReqResp messages = httpReader.decodeMessages(payload); + handler.handle(new HttpSessionPayloadEvent(sessionInfo, messages, sessionInfo.getSourceName())); + break; + default: + reader.read(); + } + + } + public void join() throws InterruptedException { if (readerThread != null) { readerThread.join(); @@ -173,28 +219,12 @@ @Override public void run() { + working = true; try { - working = true; while (working && !reader.eof()) { - byte blockType = reader.peekBlockType(); - switch (blockType) { - case NcSessionStatusBlock.TYPE: - NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) reader.read(); - SessionStatusEvent event = new SessionStatusEvent(statusBlock.sessionInfo(), statusBlock.status()); - handler.handle(event); - break; - case NcSessionPayloadBlock.TYPE: - NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) reader.read(); - SessionInfo sessionInfo = payloadBlock.sessionInfo(); - ByteBuff payload = readPayload(payloadBlock.data()); - HttpReqResp messages = httpReader.decodeMessages(payload); - handler.handle(new HttpSessionPayloadEvent(sessionInfo, messages, sessionInfo.getSourceName())); - break; - default: - reader.read(); - } + read(); } - } catch (IOException e) { + } catch (Exception e) { LOGGER.debug(e.getMessage(), e); }
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java Fri Dec 15 11:30:43 2017 +0100 +++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java Fri Dec 15 13:28:24 2017 +0100 @@ -1,7 +1,6 @@ package com.passus.st.source; import static com.passus.commons.utils.ResourceUtils.createTmpFile; -import com.passus.st.Log4jConfigurationFactory; import com.passus.st.client.ArrayListEventHandler; import com.passus.st.client.DataEvents; import com.passus.st.client.Event; @@ -58,14 +57,13 @@ @Test(dataProvider = "pcapFiles") public void testRead(String pcapFile) throws Exception { - Log4jConfigurationFactory.enableFactory("debug"); FileEvents fileEvents = writeEvents(pcapFile); try { ArrayListEventHandler handler = new ArrayListEventHandler(); NcEventSource eventSource = new NcEventSource(fileEvents.ncFile); eventSource.setHandler(handler); + eventSource.setParallel(false); eventSource.start(); - eventSource.join(); List<Event> events = handler.getEvents(); assertEquals(fileEvents.events.size(), events.size());