changeset 540:55753289dafa

MemoryEventsCache - many sources support
author Devel 2
date Tue, 12 Sep 2017 12:13:59 +0200
parents 40b568fc8bec
children 83f45c3b4c3d
files stress-tester/src/main/java/com/passus/st/Main.java stress-tester/src/main/java/com/passus/st/client/DataEvents.java stress-tester/src/main/java/com/passus/st/client/MemoryEventsCache.java stress-tester/src/test/java/com/passus/st/client/MemoryEventsCacheTest.java
diffstat 4 files changed, 101 insertions(+), 34 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/Main.java	Tue Sep 12 10:14:05 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/Main.java	Tue Sep 12 12:13:59 2017 +0200
@@ -34,8 +34,10 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -131,7 +133,7 @@
                 .hasArg().argName("speed")
                 .build());
 
-        options.addOption(option("pr", "parallelReplays").desc("Number of parallel replays")
+        options.addOption(option("pr", "parallelReplays").desc("Number of parallel replays. Works only for one pcap file.")
                 .hasArg().argName("replays")
                 .build());
 
@@ -221,11 +223,15 @@
             client.setDispatcher(new HttpSourceNameAwareClientWorkerDispatcher());
 
             if (cl.hasOption("pr")) {
+                if (clArgs.length != 1) {
+                    throw new IllegalArgumentException("Parameter \"parallelReplays\" works only for one pcap file.");
+                }
+
                 int parallelReplays = Integer.parseInt(cl.getOptionValue("pr"));
                 if (parallelReplays > 0 && parallelReplays <= 100) {
                     client.setWorkersNum(parallelReplays);
                 } else {
-                    throw new IllegalArgumentException();
+                    throw new IllegalArgumentException("Parameter \"parallelReplays\" should be in range 1-100.");
                 }
             }
 
@@ -355,7 +361,13 @@
 
             startTime = System.currentTimeMillis();
             if (cl.hasOption("ca")) {
-                MemoryEventsCache cache = new MemoryEventsCache(client);
+                Set<String> sourcesName = new HashSet<>();
+                for (int i = 0; i < clArgs.length; i++) {
+                    PcapSessionEventSource eventSrc = eventSrcs[i];
+                    sourcesName.add(eventSrc.getName());
+                }
+
+                MemoryEventsCache cache = new MemoryEventsCache(client, sourcesName);
                 for (int i = 0; i < clArgs.length; i++) {
                     PcapSessionEventSource eventSrc = eventSrcs[i];
                     eventSrc.setHandler(cache);
--- a/stress-tester/src/main/java/com/passus/st/client/DataEvents.java	Tue Sep 12 10:14:05 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/DataEvents.java	Tue Sep 12 12:13:59 2017 +0200
@@ -33,13 +33,11 @@
 
         public static final int TYPE = 3;
 
-        public static final int LOOP_NUM_UNDEFINED = -1;
-
         private int loopNum;
 
         public DataLoopEnd(String sourceName) {
             super(TYPE, sourceName);
-            this.loopNum = LOOP_NUM_UNDEFINED;
+            this.loopNum = 0;
         }
 
         public DataLoopEnd(String sourceName, int loopNum) {
--- a/stress-tester/src/main/java/com/passus/st/client/MemoryEventsCache.java	Tue Sep 12 10:14:05 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/MemoryEventsCache.java	Tue Sep 12 12:13:59 2017 +0200
@@ -4,7 +4,10 @@
 import com.passus.st.client.DataEvents.DataEnd;
 import com.passus.st.client.DataEvents.DataLoopEnd;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -13,7 +16,7 @@
  *
  * @author Mirosław Hawrot
  */
-public class MemoryEventsCache implements EventHandler {
+public final class MemoryEventsCache implements EventHandler {
 
     private static final Logger LOGGER = LogManager.getLogger(MemoryEventsCache.class);
 
@@ -31,15 +34,42 @@
 
     private long loopDelay = 0;
 
+    private Set<String> sourcesName;
+
+    private int remainsDataEnd = 1;
+
     public MemoryEventsCache(EventHandler handler) {
-        this(handler, DEFAULT_CACHE_CAPACITY);
+        this(handler, null);
     }
 
-    public MemoryEventsCache(EventHandler handler, int capacity) {
+    public MemoryEventsCache(EventHandler handler, Collection<String> sourcesName) {
+        this(handler, sourcesName, DEFAULT_CACHE_CAPACITY);
+
+    }
+
+    public MemoryEventsCache(EventHandler handler, Collection<String> sourcesName, int capacity) {
         Assert.greaterThanZero(capacity, "capacity");
         Assert.notNull(handler, "handler");
         events = new ArrayList<>(capacity);
         this.handler = handler;
+        setSourcesName(sourcesName);
+    }
+
+    private void setSourcesName(Collection<String> sourcesName) {
+        if (sourcesName == null) {
+            this.sourcesName = null;
+            remainsDataEnd = 1;
+            return;
+        }
+
+        Assert.notContainsNull(sourcesName);
+        if (this.sourcesName == null) {
+            this.sourcesName = new HashSet<>();
+        }
+
+        this.sourcesName.clear();
+        this.sourcesName.addAll(sourcesName);
+        remainsDataEnd = this.sourcesName.size();
     }
 
     public boolean isReady() {
@@ -68,6 +98,10 @@
         this.loopDelay = loopDelay;
     }
 
+    public Collection<String> getSourcesName() {
+        return sourcesName;
+    }
+
     public int size() {
         return events.size();
     }
@@ -75,7 +109,12 @@
     @Override
     public void handle(Event event) {
         boolean ready = readiness.getCount() == 0;
-        boolean finishing = event instanceof DataEvents.DataLoopEnd || event instanceof DataEvents.DataEnd;
+
+        if (event.getType() == DataEvents.DataEnd.TYPE) {
+            remainsDataEnd--;
+        }
+
+        boolean finishing = (remainsDataEnd == 0);
         if (!ready) {
             if (finishing) {
                 LOGGER.info("Caching finished. Cache has {} events.", events.size());
@@ -88,15 +127,25 @@
         }
     }
 
+    private void sendDataEnd(long timestamp, String sourceName) {
+        DataEnd endEvent = new DataEnd(sourceName);
+        endEvent.setTimestamp(timestamp == -1 ? -1 : timestamp + 1);
+        handler.handle(endEvent);
+    }
+
     public void send() {
         if (loop != LOOP_INFINITE) {
             for (int i = 0; i < loop; i++) {
                 long timestamp = sendOneLoop(i);
 
                 if (i == loop - 1) {
-                    DataEnd endEvent = new DataEnd("cache");
-                    endEvent.setTimestamp(timestamp == -1 ? -1 : timestamp + 1);
-                    handler.handle(endEvent);
+                    if (sourcesName == null) {
+                        sendDataEnd(timestamp, "");
+                    } else {
+                        sourcesName.forEach((name) -> {
+                            sendDataEnd(timestamp, name);
+                        });
+                    }
                 }
             }
         } else {
@@ -107,14 +156,18 @@
     }
 
     private long sendOneLoop(int loopNum) {
+        long lastTimestamp = 0;
         for (Event event : events) {
-            handler.handle(event);
+            if (event.getType() == DataLoopEnd.TYPE && loopNum > 0) {
+                DataLoopEnd loopEndEvent = new DataLoopEnd(event.getSourceName(), loopNum);
+                loopEndEvent.setTimestamp(event.getTimestamp());
+                handler.handle(loopEndEvent);
+            } else { 
+                handler.handle(event);
+            }
+            
+            lastTimestamp = event.getTimestamp();
         }
-        
-        long timestamp = events.isEmpty() ? -1 : (events.get(events.size() - 1).getTimestamp() + 1);
-        DataLoopEnd loopEndEvent = new DataLoopEnd("cache", loopNum);
-        loopEndEvent.setTimestamp(timestamp);
-        handler.handle(loopEndEvent);
 
         if (loopDelay > 0) {
             try {
@@ -122,7 +175,8 @@
             } catch (InterruptedException ignore) {
             }
         }
-        return timestamp;
+
+        return lastTimestamp;
     }
 
 }
--- a/stress-tester/src/test/java/com/passus/st/client/MemoryEventsCacheTest.java	Tue Sep 12 10:14:05 2017 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/MemoryEventsCacheTest.java	Tue Sep 12 12:13:59 2017 +0200
@@ -1,6 +1,8 @@
 package com.passus.st.client;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import static org.testng.Assert.*;
 import org.testng.annotations.Test;
 
@@ -9,7 +11,7 @@
  * @author mikolaj.podbielski
  */
 public class MemoryEventsCacheTest {
-    
+
     private void sink(Event e) {
     }
 
@@ -17,38 +19,39 @@
     public void testSend() {
         final int nLoops = 2;
         final int nEvents = 5;
-        
+        List<String> sourcesName = Arrays.asList("test");
+
         final ArrayList<Event> output = new ArrayList<>(2 * nLoops * (nEvents + 2));
-        final MemoryEventsCache cache = new MemoryEventsCache(output::add);
+        final MemoryEventsCache cache = new MemoryEventsCache(output::add, sourcesName);
         cache.setLoop(nLoops);
-        
-        final SessionStatusEvent sessionEvent = new SessionStatusEvent(null, 2);
+
+        final SessionStatusEvent sessionEvent = new SessionStatusEvent(null, 2, "test");
         for (int i = 0; i < nEvents; i++) {
             cache.handle(sessionEvent);
         }
-        cache.handle(new DataEvents.DataLoopEnd("parent"));
-        cache.handle(new DataEvents.DataEnd());
-        
+        cache.handle(new DataEvents.DataLoopEnd("test"));
+        cache.handle(new DataEvents.DataEnd("test"));
+
         assertTrue(cache.isReady());
         cache.send();
-        
+
         int expected = nLoops * (nEvents + 1) + 1;
         assertEquals(output.size(), expected);
-        
+
         int loopNum = 0;
         for (int i = 0; i < output.size(); i++) {
             Event e = output.get(i);
             if (i == output.size() - 1) {
                 assertEquals(e.getType(), DataEvents.DataEnd.TYPE, "event " + i);
-                assertEquals(((DataEvents.DataEnd) e).getSourceName(), "cache");
+                assertEquals(((DataEvents.DataEnd) e).getSourceName(), "test");
             } else if ((i % (nEvents + 1) == nEvents)) {
                 assertEquals(e.getType(), DataEvents.DataLoopEnd.TYPE, "event " + i);
-                assertEquals(((DataEvents.DataLoopEnd) e).getSourceName(), "cache");
+                assertEquals(((DataEvents.DataLoopEnd) e).getSourceName(), "test");
                 assertEquals(((DataEvents.DataLoopEnd) e).getLoopNum(), loopNum++);
             } else {
                 assertEquals(e.getType(), sessionEvent.getType(), "event " + i);
             }
-            
+
         }
     }
 
@@ -60,5 +63,5 @@
         cache.await();
         assertTrue(cache.isReady());
     }
-    
+
 }