Mercurial > stress-tester
changeset 540:55753289dafa
MemoryEventsCache - many sources support
author | Devel 2 |
---|---|
date | Tue, 12 Sep 2017 12:13:59 +0200 |
parents | 40b568fc8bec |
children | 83f45c3b4c3d |
files | stress-tester/src/main/java/com/passus/st/Main.java stress-tester/src/main/java/com/passus/st/client/DataEvents.java stress-tester/src/main/java/com/passus/st/client/MemoryEventsCache.java stress-tester/src/test/java/com/passus/st/client/MemoryEventsCacheTest.java |
diffstat | 4 files changed, 101 insertions(+), 34 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/Main.java Tue Sep 12 10:14:05 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/Main.java Tue Sep 12 12:13:59 2017 +0200 @@ -34,8 +34,10 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -131,7 +133,7 @@ .hasArg().argName("speed") .build()); - options.addOption(option("pr", "parallelReplays").desc("Number of parallel replays") + options.addOption(option("pr", "parallelReplays").desc("Number of parallel replays. Works only for one pcap file.") .hasArg().argName("replays") .build()); @@ -221,11 +223,15 @@ client.setDispatcher(new HttpSourceNameAwareClientWorkerDispatcher()); if (cl.hasOption("pr")) { + if (clArgs.length != 1) { + throw new IllegalArgumentException("Parameter \"parallelReplays\" works only for one pcap file."); + } + int parallelReplays = Integer.parseInt(cl.getOptionValue("pr")); if (parallelReplays > 0 && parallelReplays <= 100) { client.setWorkersNum(parallelReplays); } else { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Parameter \"parallelReplays\" should be in range 1-100."); } } @@ -355,7 +361,13 @@ startTime = System.currentTimeMillis(); if (cl.hasOption("ca")) { - MemoryEventsCache cache = new MemoryEventsCache(client); + Set<String> sourcesName = new HashSet<>(); + for (int i = 0; i < clArgs.length; i++) { + PcapSessionEventSource eventSrc = eventSrcs[i]; + sourcesName.add(eventSrc.getName()); + } + + MemoryEventsCache cache = new MemoryEventsCache(client, sourcesName); for (int i = 0; i < clArgs.length; i++) { PcapSessionEventSource eventSrc = eventSrcs[i]; eventSrc.setHandler(cache);
--- a/stress-tester/src/main/java/com/passus/st/client/DataEvents.java Tue Sep 12 10:14:05 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/DataEvents.java Tue Sep 12 12:13:59 2017 +0200 @@ -33,13 +33,11 @@ public static final int TYPE = 3; - public static final int LOOP_NUM_UNDEFINED = -1; - private int loopNum; public DataLoopEnd(String sourceName) { super(TYPE, sourceName); - this.loopNum = LOOP_NUM_UNDEFINED; + this.loopNum = 0; } public DataLoopEnd(String sourceName, int loopNum) {
--- a/stress-tester/src/main/java/com/passus/st/client/MemoryEventsCache.java Tue Sep 12 10:14:05 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/MemoryEventsCache.java Tue Sep 12 12:13:59 2017 +0200 @@ -4,7 +4,10 @@ import com.passus.st.client.DataEvents.DataEnd; import com.passus.st.client.DataEvents.DataLoopEnd; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -13,7 +16,7 @@ * * @author Mirosław Hawrot */ -public class MemoryEventsCache implements EventHandler { +public final class MemoryEventsCache implements EventHandler { private static final Logger LOGGER = LogManager.getLogger(MemoryEventsCache.class); @@ -31,15 +34,42 @@ private long loopDelay = 0; + private Set<String> sourcesName; + + private int remainsDataEnd = 1; + public MemoryEventsCache(EventHandler handler) { - this(handler, DEFAULT_CACHE_CAPACITY); + this(handler, null); } - public MemoryEventsCache(EventHandler handler, int capacity) { + public MemoryEventsCache(EventHandler handler, Collection<String> sourcesName) { + this(handler, sourcesName, DEFAULT_CACHE_CAPACITY); + + } + + public MemoryEventsCache(EventHandler handler, Collection<String> sourcesName, int capacity) { Assert.greaterThanZero(capacity, "capacity"); Assert.notNull(handler, "handler"); events = new ArrayList<>(capacity); this.handler = handler; + setSourcesName(sourcesName); + } + + private void setSourcesName(Collection<String> sourcesName) { + if (sourcesName == null) { + this.sourcesName = null; + remainsDataEnd = 1; + return; + } + + Assert.notContainsNull(sourcesName); + if (this.sourcesName == null) { + this.sourcesName = new HashSet<>(); + } + + this.sourcesName.clear(); + this.sourcesName.addAll(sourcesName); + remainsDataEnd = this.sourcesName.size(); } public boolean isReady() { @@ -68,6 +98,10 @@ this.loopDelay = loopDelay; } + public Collection<String> getSourcesName() { + return sourcesName; + } + public int size() { return events.size(); } @@ -75,7 +109,12 @@ @Override public void handle(Event event) { boolean ready = readiness.getCount() == 0; - boolean finishing = event instanceof DataEvents.DataLoopEnd || event instanceof DataEvents.DataEnd; + + if (event.getType() == DataEvents.DataEnd.TYPE) { + remainsDataEnd--; + } + + boolean finishing = (remainsDataEnd == 0); if (!ready) { if (finishing) { LOGGER.info("Caching finished. Cache has {} events.", events.size()); @@ -88,15 +127,25 @@ } } + private void sendDataEnd(long timestamp, String sourceName) { + DataEnd endEvent = new DataEnd(sourceName); + endEvent.setTimestamp(timestamp == -1 ? -1 : timestamp + 1); + handler.handle(endEvent); + } + public void send() { if (loop != LOOP_INFINITE) { for (int i = 0; i < loop; i++) { long timestamp = sendOneLoop(i); if (i == loop - 1) { - DataEnd endEvent = new DataEnd("cache"); - endEvent.setTimestamp(timestamp == -1 ? -1 : timestamp + 1); - handler.handle(endEvent); + if (sourcesName == null) { + sendDataEnd(timestamp, ""); + } else { + sourcesName.forEach((name) -> { + sendDataEnd(timestamp, name); + }); + } } } } else { @@ -107,14 +156,18 @@ } private long sendOneLoop(int loopNum) { + long lastTimestamp = 0; for (Event event : events) { - handler.handle(event); + if (event.getType() == DataLoopEnd.TYPE && loopNum > 0) { + DataLoopEnd loopEndEvent = new DataLoopEnd(event.getSourceName(), loopNum); + loopEndEvent.setTimestamp(event.getTimestamp()); + handler.handle(loopEndEvent); + } else { + handler.handle(event); + } + + lastTimestamp = event.getTimestamp(); } - - long timestamp = events.isEmpty() ? -1 : (events.get(events.size() - 1).getTimestamp() + 1); - DataLoopEnd loopEndEvent = new DataLoopEnd("cache", loopNum); - loopEndEvent.setTimestamp(timestamp); - handler.handle(loopEndEvent); if (loopDelay > 0) { try { @@ -122,7 +175,8 @@ } catch (InterruptedException ignore) { } } - return timestamp; + + return lastTimestamp; } }
--- a/stress-tester/src/test/java/com/passus/st/client/MemoryEventsCacheTest.java Tue Sep 12 10:14:05 2017 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/MemoryEventsCacheTest.java Tue Sep 12 12:13:59 2017 +0200 @@ -1,6 +1,8 @@ package com.passus.st.client; import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import static org.testng.Assert.*; import org.testng.annotations.Test; @@ -9,7 +11,7 @@ * @author mikolaj.podbielski */ public class MemoryEventsCacheTest { - + private void sink(Event e) { } @@ -17,38 +19,39 @@ public void testSend() { final int nLoops = 2; final int nEvents = 5; - + List<String> sourcesName = Arrays.asList("test"); + final ArrayList<Event> output = new ArrayList<>(2 * nLoops * (nEvents + 2)); - final MemoryEventsCache cache = new MemoryEventsCache(output::add); + final MemoryEventsCache cache = new MemoryEventsCache(output::add, sourcesName); cache.setLoop(nLoops); - - final SessionStatusEvent sessionEvent = new SessionStatusEvent(null, 2); + + final SessionStatusEvent sessionEvent = new SessionStatusEvent(null, 2, "test"); for (int i = 0; i < nEvents; i++) { cache.handle(sessionEvent); } - cache.handle(new DataEvents.DataLoopEnd("parent")); - cache.handle(new DataEvents.DataEnd()); - + cache.handle(new DataEvents.DataLoopEnd("test")); + cache.handle(new DataEvents.DataEnd("test")); + assertTrue(cache.isReady()); cache.send(); - + int expected = nLoops * (nEvents + 1) + 1; assertEquals(output.size(), expected); - + int loopNum = 0; for (int i = 0; i < output.size(); i++) { Event e = output.get(i); if (i == output.size() - 1) { assertEquals(e.getType(), DataEvents.DataEnd.TYPE, "event " + i); - assertEquals(((DataEvents.DataEnd) e).getSourceName(), "cache"); + assertEquals(((DataEvents.DataEnd) e).getSourceName(), "test"); } else if ((i % (nEvents + 1) == nEvents)) { assertEquals(e.getType(), DataEvents.DataLoopEnd.TYPE, "event " + i); - assertEquals(((DataEvents.DataLoopEnd) e).getSourceName(), "cache"); + assertEquals(((DataEvents.DataLoopEnd) e).getSourceName(), "test"); assertEquals(((DataEvents.DataLoopEnd) e).getLoopNum(), loopNum++); } else { assertEquals(e.getType(), sessionEvent.getType(), "event " + i); } - + } } @@ -60,5 +63,5 @@ cache.await(); assertTrue(cache.isReady()); } - + }