changeset 763:cb51fb3a6ebd

NcEventSource.parallel option
author Devel 2
date Fri, 15 Dec 2017 13:28:24 +0100
parents 6bb60e93eb43
children dc80677e532f
files stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java stress-tester/src/main/java/com/passus/st/source/NcEventSource.java stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java
diffstat 3 files changed, 56 insertions(+), 24 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java	Fri Dec 15 11:30:43 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockReader.java	Fri Dec 15 13:28:24 2017 +0100
@@ -140,6 +140,10 @@
     }
 
     private int read(int requiredBytes) throws IOException {
+        return read(requiredBytes, buffer);
+    }
+
+    private int read(int requiredBytes, ByteBuff buffer) throws IOException {
         if (eof) {
             return -1;
         }
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Fri Dec 15 11:30:43 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Fri Dec 15 13:28:24 2017 +0100
@@ -46,6 +46,8 @@
 
     private ReaderThread readerThread;
 
+    private boolean parallel = true;
+
     public NcEventSource() {
     }
 
@@ -82,6 +84,14 @@
         throw new UnsupportedOperationException("Not supported yet.");
     }
 
+    public boolean isParallel() {
+        return parallel;
+    }
+
+    public void setParallel(boolean parallel) {
+        this.parallel = parallel;
+    }
+
     @Override
     public EventHandler getHandler() {
         return handler;
@@ -108,10 +118,15 @@
             reader = new NcDataBlockReader(ncFile);
             reader.open();
 
-            readerThread = new ReaderThread();
-            readerThread.start();
+            if (parallel) {
+                readerThread = new ReaderThread();
+                readerThread.start();
+            }
 
             started = true;
+            if (!parallel) {
+                readAll();
+            }
         } catch (Exception e) {
             stop0();
             throw new ServiceException("Unable to start NcEventDestination. " + e.getMessage(), e);
@@ -148,6 +163,37 @@
         started = false;
     }
 
+    private void readAll() {
+        try {
+            while (!reader.eof()) {
+                read();
+            }
+        } catch (Exception e) {
+            LOGGER.debug(e.getMessage(), e);
+        }
+    }
+
+    private void read() throws IOException {
+        byte blockType = reader.peekBlockType();
+        switch (blockType) {
+            case NcSessionStatusBlock.TYPE:
+                NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) reader.read();
+                SessionStatusEvent event = new SessionStatusEvent(statusBlock.sessionInfo(), statusBlock.status());
+                handler.handle(event);
+                break;
+            case NcSessionPayloadBlock.TYPE:
+                NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) reader.read();
+                SessionInfo sessionInfo = payloadBlock.sessionInfo();
+                ByteBuff payload = readPayload(payloadBlock.data());
+                HttpReqResp messages = httpReader.decodeMessages(payload);
+                handler.handle(new HttpSessionPayloadEvent(sessionInfo, messages, sessionInfo.getSourceName()));
+                break;
+            default:
+                reader.read();
+        }
+
+    }
+
     public void join() throws InterruptedException {
         if (readerThread != null) {
             readerThread.join();
@@ -173,28 +219,12 @@
 
         @Override
         public void run() {
+            working = true;
             try {
-                working = true;
                 while (working && !reader.eof()) {
-                    byte blockType = reader.peekBlockType();
-                    switch (blockType) {
-                        case NcSessionStatusBlock.TYPE:
-                            NcSessionStatusBlock statusBlock = (NcSessionStatusBlock) reader.read();
-                            SessionStatusEvent event = new SessionStatusEvent(statusBlock.sessionInfo(), statusBlock.status());
-                            handler.handle(event);
-                            break;
-                        case NcSessionPayloadBlock.TYPE:
-                            NcSessionPayloadBlock payloadBlock = (NcSessionPayloadBlock) reader.read();
-                            SessionInfo sessionInfo = payloadBlock.sessionInfo();
-                            ByteBuff payload = readPayload(payloadBlock.data());
-                            HttpReqResp messages = httpReader.decodeMessages(payload);
-                            handler.handle(new HttpSessionPayloadEvent(sessionInfo, messages, sessionInfo.getSourceName()));
-                            break;
-                        default:
-                            reader.read();
-                    }
+                    read();
                 }
-            } catch (IOException e) {
+            } catch (Exception e) {
                 LOGGER.debug(e.getMessage(), e);
             }
 
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Fri Dec 15 11:30:43 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Fri Dec 15 13:28:24 2017 +0100
@@ -1,7 +1,6 @@
 package com.passus.st.source;
 
 import static com.passus.commons.utils.ResourceUtils.createTmpFile;
-import com.passus.st.Log4jConfigurationFactory;
 import com.passus.st.client.ArrayListEventHandler;
 import com.passus.st.client.DataEvents;
 import com.passus.st.client.Event;
@@ -58,14 +57,13 @@
 
     @Test(dataProvider = "pcapFiles")
     public void testRead(String pcapFile) throws Exception {
-        Log4jConfigurationFactory.enableFactory("debug");
         FileEvents fileEvents = writeEvents(pcapFile);
         try {
             ArrayListEventHandler handler = new ArrayListEventHandler();
             NcEventSource eventSource = new NcEventSource(fileEvents.ncFile);
             eventSource.setHandler(handler);
+            eventSource.setParallel(false);
             eventSource.start();
-            eventSource.join();
 
             List<Event> events = handler.getEvents();
             assertEquals(fileEvents.events.size(), events.size());