Mercurial > stress-tester
changeset 539:40b568fc8bec
Many pcap files support
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/Main.java Mon Sep 11 12:17:48 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/Main.java Tue Sep 12 10:14:05 2017 +0200 @@ -13,6 +13,7 @@ import com.passus.st.client.http.HttpClient; import com.passus.st.client.http.HttpReporterClientListener; import com.passus.st.client.http.HttpReporterMetricHandler; +import com.passus.st.client.http.HttpSourceNameAwareClientWorkerDispatcher; import com.passus.st.client.http.SummaryHttpClientListener; import com.passus.st.client.http.WriterHttpClientListener; import com.passus.st.client.http.filter.HttpFiltersConfigurator; @@ -176,8 +177,8 @@ try { CommandLine cl = new DefaultParser().parse(options, args); String[] clArgs = cl.getArgs(); - if (clArgs.length != 1) { - System.err.println("Pcap file required."); + if (clArgs.length < 1) { + System.err.println("At least one pcap file required."); printHelp(options); return; } @@ -216,6 +217,9 @@ client.setCollectMetrics(true); client.setConnectPartialSession(cl.hasOption("ps")); client.setWokerType(cl.getOptionValue("wt", "synch")); + client.setWorkersNum(clArgs.length); + client.setDispatcher(new HttpSourceNameAwareClientWorkerDispatcher()); + if (cl.hasOption("pr")) { int parallelReplays = Integer.parseInt(cl.getOptionValue("pr")); if (parallelReplays > 0 && parallelReplays <= 100) { @@ -299,18 +303,26 @@ client.start(); - PcapSessionEventSource eventSrc = new PcapSessionEventSource(); - eventSrc.setPcapFile(clArgs[0]); - eventSrc.setAllowPartialSession(cl.hasOption("ps")); - eventSrc.setCollectMetrics(true); + PcapSessionEventSource[] eventSrcs = new PcapSessionEventSource[clArgs.length]; + + for (int i = 0; i < clArgs.length; i++) { + PcapSessionEventSource eventSrc = new PcapSessionEventSource(); + eventSrc.setPcapFile(clArgs[i]); + eventSrc.setAllowPartialSession(cl.hasOption("ps")); + eventSrc.setCollectMetrics(true); + eventSrcs[i] = eventSrc; + } if (cl.hasOption("hp")) { - PortRangeSet portsRanges = eventSrc.getPortsRange(); - portsRanges.clear(); + for (int i = 0; i < clArgs.length; i++) { + PcapSessionEventSource eventSrc = eventSrcs[i]; + PortRangeSet portsRanges = eventSrc.getPortsRange(); + portsRanges.clear(); - String[] ports = cl.getOptionValues("hp"); - for (String port : ports) { - portsRanges.add(port); + String[] ports = cl.getOptionValues("hp"); + for (String port : ports) { + portsRanges.add(port); + } } } @@ -333,7 +345,10 @@ } collector.addHandler(summMetricsHandler); - collector.register(eventSrc); + for (int i = 0; i < clArgs.length; i++) { + collector.register(eventSrcs[i]); + } + collector.register(emitter); collector.register(client); collector.start(); @@ -341,15 +356,21 @@ startTime = System.currentTimeMillis(); if (cl.hasOption("ca")) { MemoryEventsCache cache = new MemoryEventsCache(client); - eventSrc.setHandler(cache); - eventSrc.start(); + for (int i = 0; i < clArgs.length; i++) { + PcapSessionEventSource eventSrc = eventSrcs[i]; + eventSrc.setHandler(cache); + eventSrc.start(); + } cache.setLoop(loops); cache.await(); cache.send(); } else { - eventSrc.setHandler(client); - eventSrc.setLoops(loops); - eventSrc.start(); + for (int i = 0; i < clArgs.length; i++) { + PcapSessionEventSource eventSrc = eventSrcs[i]; + eventSrc.setHandler(client); + eventSrc.setLoops(loops); + eventSrc.start(); + } } ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); @@ -363,7 +384,10 @@ collector.collect(); executor.shutdownNow(); - eventSrc.stop(); + for (int i = 0; i < clArgs.length; i++) { + eventSrcs[i].stop(); + } + client.stop(); emitter.stop(); if (reporterClient != null) {
--- a/stress-tester/src/main/java/com/passus/st/client/AbstractEvent.java Mon Sep 11 12:17:48 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/AbstractEvent.java Tue Sep 12 10:14:05 2017 +0200 @@ -12,9 +12,12 @@ private final int type; - public AbstractEvent(int type) { + protected final String sourceName; + + public AbstractEvent(int type, String sourceName) { timestamp = TimeGenerator.getDefaultGenerator().currentTimeMillis(); this.type = type; + this.sourceName = sourceName; } @Override @@ -32,4 +35,9 @@ return type; } + @Override + public String getSourceName() { + return sourceName; + } + }
--- a/stress-tester/src/main/java/com/passus/st/client/DataEvents.java Mon Sep 11 12:17:48 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/DataEvents.java Tue Sep 12 10:14:05 2017 +0200 @@ -13,19 +13,12 @@ public static final int TYPE = 2; - private final String sourceName; - public DataEnd() { this(""); } public DataEnd(String sourceName) { - super(TYPE); - this.sourceName = sourceName; - } - - public String getSourceName() { - return sourceName; + super(TYPE, sourceName); } @Override @@ -42,26 +35,18 @@ public static final int LOOP_NUM_UNDEFINED = -1; - private final String sourceName; - private int loopNum; public DataLoopEnd(String sourceName) { - super(TYPE); - this.sourceName = sourceName; + super(TYPE, sourceName); this.loopNum = LOOP_NUM_UNDEFINED; } public DataLoopEnd(String sourceName, int loopNum) { - super(TYPE); - this.sourceName = sourceName; + super(TYPE, sourceName); this.loopNum = loopNum; } - public String getSourceName() { - return sourceName; - } - public int getLoopNum() { return loopNum; }
--- a/stress-tester/src/main/java/com/passus/st/client/Event.java Mon Sep 11 12:17:48 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/Event.java Tue Sep 12 10:14:05 2017 +0200 @@ -7,9 +7,11 @@ public interface Event { public int getType(); - + public long getTimestamp(); - + public void setTimestamp(long timestamp); - + + public String getSourceName(); + }
--- a/stress-tester/src/main/java/com/passus/st/client/SessionEvent.java Mon Sep 11 12:17:48 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SessionEvent.java Tue Sep 12 10:14:05 2017 +0200 @@ -11,13 +11,16 @@ private final SessionInfo sessionInfo; + private final String sourceName; + private long timestamp; - public SessionEvent(SessionInfo sessionInfo) { + public SessionEvent(SessionInfo sessionInfo, String sourceName) { this.sessionInfo = sessionInfo; timestamp = TimeGenerator.getDefaultGenerator().currentTimeMillis(); + this.sourceName = sourceName; } - + public SessionInfo getSessionInfo() { return sessionInfo; } @@ -32,5 +35,10 @@ this.timestamp = timestamp; } + @Override + public String getSourceName() { + return sourceName; + } + public abstract SessionEvent instanceForWorker(int index); }
--- a/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java Mon Sep 11 12:17:48 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java Tue Sep 12 10:14:05 2017 +0200 @@ -10,8 +10,8 @@ private final T payload; - public SessionPayloadEvent(SessionInfo sessionInfo, T payload) { - super(sessionInfo); + public SessionPayloadEvent(SessionInfo sessionInfo, T payload, String sourceName) { + super(sessionInfo, sourceName); this.payload = payload; }
--- a/stress-tester/src/main/java/com/passus/st/client/SessionStatusEvent.java Mon Sep 11 12:17:48 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SessionStatusEvent.java Tue Sep 12 10:14:05 2017 +0200 @@ -18,12 +18,16 @@ private final int status; private SessionStatusEvent() { - super(null); + super(null, null); status = 0; } public SessionStatusEvent(SessionInfo sessionInfo, int status) { - super(sessionInfo); + this(sessionInfo, status, null); + } + + public SessionStatusEvent(SessionInfo sessionInfo, int status, String sourceName) { + super(sessionInfo, sourceName); if (status < STATUS_OPENING || status > STATUS_CLOSED) { throw new IllegalArgumentException("Invalid status '" + status + "'."); @@ -66,7 +70,7 @@ @Override public SessionStatusEvent instanceForWorker(int index) { - return new SessionStatusEvent(getSessionInfo(), status); + return new SessionStatusEvent(getSessionInfo(), status, getSourceName()); } }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java Mon Sep 11 12:17:48 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java Tue Sep 12 10:14:05 2017 +0200 @@ -179,7 +179,7 @@ HttpResponse resp = payloadEvent.getResponse(); HttpRequest req = payloadEvent.getRequest(); long respTime = time + (resp.getTimestamp() - req.getTimestamp()); - HttpResponseEvent respEvent = new HttpResponseEvent(session, resp, respTime); + HttpResponseEvent respEvent = new HttpResponseEvent(session, event.getSourceName(), resp, respTime); task.events.add(respEvent); } @@ -366,33 +366,12 @@ } } - private static final class HttpRequestEvent extends SessionPayloadEvent<HttpRequest> { - - public static final int TYPE = 1011; - - public HttpRequestEvent(SessionInfo sessionInfo, HttpRequest payload, long timestamp) { - super(sessionInfo, payload); - setTimestamp(timestamp); - } - - @Override - public SessionEvent instanceForWorker(int index) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public int getType() { - return TYPE; - } - - } - private static final class HttpResponseEvent extends SessionPayloadEvent<HttpResponse> { public static final int TYPE = 1012; - public HttpResponseEvent(SessionInfo sessionInfo, HttpResponse payload, long timestamp) { - super(sessionInfo, payload); + public HttpResponseEvent(SessionInfo sessionInfo, String sourceName, HttpResponse payload, long timestamp) { + super(sessionInfo, payload, sourceName); setTimestamp(timestamp); }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java Mon Sep 11 12:17:48 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java Tue Sep 12 10:14:05 2017 +0200 @@ -45,6 +45,8 @@ private boolean connectPartialSession = false; + private HttpClientWorkerDispatcher dispatcher; + public HttpClient(Emitter emitter) { Assert.notNull(emitter, "emitter"); this.emitter = emitter; @@ -100,6 +102,14 @@ this.sleepFactor = sleepFactor; } + public HttpClientWorkerDispatcher getDispatcher() { + return dispatcher; + } + + public void setDispatcher(HttpClientWorkerDispatcher dispatcher) { + this.dispatcher = dispatcher; + } + public int getActiveConnections() { int count = 0; for (HttpClientWorker worker : workers) { @@ -238,9 +248,21 @@ LOGGER.trace("Event: {}", event); } - for (HttpClientWorker worker : workers) { - worker.handle(event); + try { + if (dispatcher == null) { + for (HttpClientWorker worker : workers) { + worker.handle(event); + } + } else { + int index = dispatcher.dispatch(event, workers); + workers[index].handle(event); + } + } catch (Exception e) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(e.getMessage(), e); + } } + } }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerDispatcher.java Mon Sep 11 12:17:48 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerDispatcher.java Tue Sep 12 10:14:05 2017 +0200 @@ -1,16 +1,15 @@ package com.passus.st.client.http; -import com.passus.st.client.SessionEvent; +import com.passus.st.client.Event; /** * * @author Mirosław Hawrot */ -@Deprecated public interface HttpClientWorkerDispatcher { - public abstract HttpClientWorker find(SessionEvent event, HttpClientWorker[] workers); + public abstract HttpClientWorker find(Event event, HttpClientWorker[] workers); - public abstract int dispatch(SessionEvent event, HttpClientWorker[] workers); + public abstract int dispatch(Event event, HttpClientWorker[] workers); }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java Mon Sep 11 12:17:48 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java Tue Sep 12 10:14:05 2017 +0200 @@ -7,7 +7,6 @@ import com.passus.data.DataDecoder; import com.passus.data.HeapByteBuff; import com.passus.net.http.HttpFullMessageDecoder; -import com.passus.net.http.HttpMethod; import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpRequestEncoder; import com.passus.net.http.HttpResponse;
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java Mon Sep 11 12:17:48 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java Tue Sep 12 10:14:05 2017 +0200 @@ -15,12 +15,12 @@ public static final int TYPE = 12; - public HttpSessionPayloadEvent(SessionInfo sessionInfo, HttpRequest req, HttpResponse resp) { - this(sessionInfo, new HttpReqResp(req, resp)); + public HttpSessionPayloadEvent(SessionInfo sessionInfo, HttpRequest req, HttpResponse resp, String sourceName) { + this(sessionInfo, new HttpReqResp(req, resp), sourceName); } - public HttpSessionPayloadEvent(SessionInfo sessionInfo, HttpReqResp payload) { - super(sessionInfo, payload); + public HttpSessionPayloadEvent(SessionInfo sessionInfo, HttpReqResp payload, String sourceName) { + super(sessionInfo, payload, sourceName); } public HttpRequest getRequest() { @@ -40,7 +40,7 @@ public HttpSessionPayloadEvent instanceForWorker(int index) { HttpReqResp payload = getPayload(); HttpRequest reqCopy = payload.request == null ? null : new HttpRequest(payload.request); - return new HttpSessionPayloadEvent(getSessionInfo(), reqCopy, payload.response); + return new HttpSessionPayloadEvent(getSessionInfo(), reqCopy, payload.response, getSourceName()); } public static class HttpReqResp {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSourceNameAwareClientWorkerDispatcher.java Tue Sep 12 10:14:05 2017 +0200 @@ -0,0 +1,41 @@ +package com.passus.st.client.http; + +import com.passus.st.client.Event; +import java.util.ArrayList; +import java.util.List; + +/** + * + * @author Mirosław Hawrot + */ +public class HttpSourceNameAwareClientWorkerDispatcher implements HttpClientWorkerDispatcher { + + private final List<String> sources = new ArrayList<>(4); + + @Override + public HttpClientWorker find(Event event, HttpClientWorker[] workers) { + int index = dispatch(event, workers); + return workers[index]; + } + + @Override + public int dispatch(Event event, HttpClientWorker[] workers) { + String sourceName = event.getSourceName(); + if (sourceName == null) { + throw new IllegalArgumentException("Event '" + event.getClass() + "' source name cannot be null."); + } + + int index = sources.indexOf(sourceName); + if (index == -1) { + if (sources.size() == workers.length) { + throw new IllegalArgumentException("Too low workers number."); + } + + sources.add(sourceName); + index = sources.size() - 1; + } + + return index; + } + +}
--- a/stress-tester/src/main/java/com/passus/st/client/http/ModuloHttpClientWorkerDispatcher.java Mon Sep 11 12:17:48 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,24 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.st.client.SessionEvent; - -/** - * - * @author Mirosław Hawrot - */ -@Deprecated -public class ModuloHttpClientWorkerDispatcher implements HttpClientWorkerDispatcher { - - @Override - public HttpClientWorker find(SessionEvent event, HttpClientWorker[] workers) { - int index = dispatch(event, workers); - return workers[index]; - } - - @Override - public int dispatch(SessionEvent event, HttpClientWorker[] workers) { - int hashCode = (event.getSessionInfo().hashCode()) & 0x7fffffff; - return hashCode % workers.length; - } - -}
--- a/stress-tester/src/main/java/com/passus/st/emitter/DynamicSessionMapper.java Mon Sep 11 12:17:48 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/DynamicSessionMapper.java Tue Sep 12 10:14:05 2017 +0200 @@ -1,7 +1,6 @@ package com.passus.st.emitter; import com.passus.net.IpAddress; -import com.passus.net.SocketAddress; import java.util.HashMap; import java.util.Map; import java.util.Set;
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Mon Sep 11 12:17:48 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Tue Sep 12 10:14:05 2017 +0200 @@ -6,6 +6,7 @@ import com.passus.net.session.SessionBean; import com.passus.net.session.SessionUtils; import java.text.ParseException; +import java.util.Objects; import org.apache.commons.lang3.StringUtils; /** @@ -32,6 +33,8 @@ private final int hashCode; + private String sourceName; + public SessionInfo(String srcSocket, String dstSocket) throws ParseException { this(new SocketAddress(srcSocket), new SocketAddress(dstSocket)); } @@ -67,6 +70,14 @@ this.hashCode = (srcPort + dstPort << 16) ^ srcIp.hashCode() ^ dstIp.hashCode(); } + public String getSourceName() { + return sourceName; + } + + public void setSourceName(String sourceName) { + this.sourceName = sourceName; + } + public int getTransport() { return transport; } @@ -110,7 +121,8 @@ } final SessionInfo other = (SessionInfo) obj; - return transport == transport + return Objects.equals(sourceName, other.sourceName) + && transport == other.transport && srcPort == other.srcPort && dstPort == other.dstPort && srcIp.equals(other.srcIp) && dstIp.equals(other.dstIp); } @@ -124,6 +136,10 @@ sb.append("<->"); sb.append(dstIp).append(":").append(dstPort); + if (sourceName != null) { + sb.append(" ").append(sourceName); + } + return sb.toString(); }
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java Mon Sep 11 12:17:48 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java Tue Sep 12 10:14:05 2017 +0200 @@ -4,7 +4,7 @@ import com.passus.commons.annotations.Plugin; import com.passus.commons.service.ServiceException; import com.passus.commons.time.CustomTimeGenerator; -import com.passus.commons.utils.StringUtils; +import com.passus.commons.utils.UniqueIdGenerator; import com.passus.config.Configuration; import com.passus.net.Frame; import com.passus.net.MemoryFrame; @@ -91,7 +91,7 @@ public PcapSessionEventSource() { portsRange.add(80); portsRange.add(8080); - this.name = StringUtils.randomString(); + this.name = UniqueIdGenerator.generate(); } @Override @@ -528,8 +528,9 @@ context.getSrcIpAddr(), context.getSrcPort(), context.getDstIpAddr(), context.getDstPort(), context.getProtocol(), context.getId()); - - Event event = new HttpSessionPayloadEvent(info, req, (HttpResponse) message); + info.setSourceName(name); + + Event event = new HttpSessionPayloadEvent(info, req, (HttpResponse) message, name); event.setTimestamp(timestamp); eventHandler.handle(event); if (collectMetric) { @@ -555,8 +556,9 @@ context.getSrcIpAddr(), context.getSrcPort(), context.getDstIpAddr(), context.getDstPort(), context.getProtocol(), context.getId()); - - Event event = new SessionStatusEvent(info, sessionInfoStatus); + info.setSourceName(name); + + Event event = new SessionStatusEvent(info, sessionInfoStatus, name); event.setTimestamp(timestamp); eventHandler.handle(event); } @@ -570,7 +572,7 @@ private final Ip ip; public IpEvent(Ip ip) { - super(TYPE); + super(TYPE, null); this.ip = ip; }