Mercurial > stress-tester
changeset 563:0f92cca7980c
reporter - sequence
line wrap: on
line diff
--- a/stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroCustomBenchmark.java Thu Sep 21 15:21:35 2017 +0200 +++ b/stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroCustomBenchmark.java Thu Sep 21 15:41:50 2017 +0200 @@ -1,8 +1,7 @@ package com.passus.st.avro; -import com.passus.st.reporter.trx.ServerMain; +import com.passus.st.reporter.trx.Server; import com.passus.st.reporter.trx.SocketReporterClient; -import com.passus.st.reporter.protocol.Reporter; import com.passus.utils.AllocationUtils; import java.io.IOException; import java.util.concurrent.SynchronousQueue; @@ -35,8 +34,8 @@ @Warmup(iterations = 7) public class AvroCustomBenchmark extends AbstractAvroBenchmark { - private final ServerMain server = new ServerMain("localhost", 11111, 500, new DummyReporter()); - private final SocketReporterClient client = new SocketReporterClient(serverAddress, new SynchronousQueue<>(), false); + private final Server server = new Server(serverAddress, new DummyReporter()); + private final SocketReporterClient client = new SocketReporterClient(serverAddress, 4, new SynchronousQueue<>(), false); private final TestMetric metric = new TestMetric(); @Setup
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/ReporterImpl.java Thu Sep 21 15:21:35 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/ReporterImpl.java Thu Sep 21 15:41:50 2017 +0200 @@ -145,6 +145,11 @@ reqRespFile.flush(); builder.setLength(0); } + } else if (code.equalsIgnoreCase("sequence")) { + Map<CharSequence, Object> fields = metric.getFields(); + System.out.println("SEQUENCE: " + fields); + } else { + System.out.println("Unknown metric: " + code); } return "OK";
--- a/stress-tester/src/main/java/com/passus/st/Main.java Thu Sep 21 15:21:35 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/Main.java Thu Sep 21 15:41:50 2017 +0200 @@ -2,6 +2,7 @@ import com.passus.commons.ConversionException; import com.passus.commons.metric.Metric; +import com.passus.commons.service.Registry; import com.passus.config.Configuration; import com.passus.config.YamlConfigurationReader; import com.passus.config.validation.Errors; @@ -11,8 +12,7 @@ import com.passus.st.client.MemoryEventsCache; import com.passus.st.client.http.DumperHttpClientListener; import com.passus.st.client.http.HttpClient; -import com.passus.st.client.http.HttpReporterClientListener; -import com.passus.st.client.http.HttpReporterMetricHandler; +import com.passus.st.client.http.ReporterWrapper; import com.passus.st.client.http.HttpSourceNameAwareClientWorkerDispatcher; import com.passus.st.client.http.SummaryHttpClientListener; import com.passus.st.client.http.WriterHttpClientListener; @@ -255,21 +255,6 @@ } } - if (cl.hasOption("ff")) { - File filtersFile = new File(cl.getOptionValue("ff")); - Configuration config = YamlConfigurationReader.readFromFile(filtersFile); - HttpFiltersConfigurator configurator = new HttpFiltersConfigurator(client); - Errors errors = new Errors(); - configurator.configure(config, errors); - if (errors.getErrorCount() != 0) { - System.out.println("Error in file '" + filtersFile.getAbsolutePath() + "'."); - for (ObjectError error : errors.getAllErrors()) { - System.out.println("\t" + objectErrorToString(error)); - } - System.exit(1); - } - } - if (cl.hasOption("wf")) { WriterHttpClientListener writerListener; String value = cl.getOptionValue("wf"); @@ -320,8 +305,25 @@ } reporterClient.start(); - client.addListener(new HttpReporterClientListener(reporterClient)); - collector.addHandler(new HttpReporterMetricHandler(reporterClient)); + ReporterWrapper reporterWrapper = new ReporterWrapper(reporterClient); + Registry.getInstance().add("reporterWrapper", reporterWrapper); + client.addListener(reporterWrapper); + collector.addHandler(reporterWrapper); + } + + if (cl.hasOption("ff")) { + File filtersFile = new File(cl.getOptionValue("ff")); + Configuration config = YamlConfigurationReader.readFromFile(filtersFile); + HttpFiltersConfigurator configurator = new HttpFiltersConfigurator(client); + Errors errors = new Errors(); + configurator.configure(config, errors); + if (errors.getErrorCount() != 0) { + System.out.println("Error in file '" + filtersFile.getAbsolutePath() + "'."); + for (ObjectError error : errors.getAllErrors()) { + System.out.println("\t" + objectErrorToString(error)); + } + System.exit(1); + } } client.start();
--- a/stress-tester/src/main/java/com/passus/st/PcapReporter.java Thu Sep 21 15:21:35 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/PcapReporter.java Thu Sep 21 15:41:50 2017 +0200 @@ -21,9 +21,9 @@ import static com.passus.st.client.http.HttpConsts.TAG_HEADER_SIZE; import static com.passus.st.client.http.HttpConsts.TAG_TIME_START; import com.passus.st.client.http.HttpFlowContext; -import com.passus.st.client.http.HttpReporterClientListener; import com.passus.st.client.http.HttpScopes; import com.passus.st.client.http.HttpSessionPayloadEvent; +import com.passus.st.client.http.ReporterWrapper; import com.passus.st.emitter.SessionInfo; import com.passus.st.metric.FileMetricsCollectionAppender; import com.passus.st.metric.ScheduledMetricsCollector; @@ -169,13 +169,13 @@ private final HttpRequestEncoder reqEncoder = new HttpRequestEncoder(); private final HttpResponseEncoder respEncoder = new HttpResponseEncoder(); - private final HttpReporterClientListener reporter; + private final ReporterWrapper reporter; private final boolean partialSession; private volatile int count; private volatile boolean finished; public LocalHandler(SocketReporterClient reporterClient, boolean partialSession) { - reporter = new HttpReporterClientListener(reporterClient); + reporter = new ReporterWrapper(reporterClient); this.partialSession = partialSession; }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpReporterClientListener.java Thu Sep 21 15:21:35 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,121 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.data.ByteString; -import com.passus.net.SocketAddress; -import com.passus.net.http.HttpHeaders; -import com.passus.net.http.HttpMessage; -import com.passus.net.http.HttpRequest; -import com.passus.net.http.HttpResponse; -import com.passus.st.ParametersBag; -import static com.passus.st.client.http.HttpConsts.PARAM_USERNAME; -import static com.passus.st.client.http.HttpConsts.TAG_CONTENT_SIZE; -import static com.passus.st.client.http.HttpConsts.TAG_HEADER_SIZE; -import static com.passus.st.client.http.HttpConsts.TAG_SESSION_ID; -import static com.passus.st.client.http.HttpConsts.TAG_TIME_END; -import static com.passus.st.client.http.HttpConsts.TAG_TIME_START; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.reporter.ReporterClient; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * - * @author Mirosław Hawrot - */ -public class HttpReporterClientListener implements HttpClientListener { - - private static final Set<ByteString> ALLOWED_REQ_HEADERS = new HashSet<>(); - private static final Set<ByteString> ALLOWED_RESP_HEADERS = new HashSet<>(); - - private final ReporterClient reporterClient; - - public HttpReporterClientListener(ReporterClient reporterClient) { - this.reporterClient = reporterClient; - } - - static { - ALLOWED_REQ_HEADERS.add(HttpHeaders.USER_AGENT); - ALLOWED_REQ_HEADERS.add(HttpHeaders.CONTENT_TYPE); - ALLOWED_RESP_HEADERS.add(HttpHeaders.CONTENT_TYPE); - } - - private void populateHeaders(Map<CharSequence, CharSequence> dst, Set<ByteString> allowedHeaders, HttpMessage message) { - HttpHeaders headers = message.getHeaders(); - for (ByteString name : allowedHeaders) { - ByteString value = headers.get(name); - if (value != null) { - dst.put(name, value); - } - } - } - - private void populateMisc(Map<CharSequence, CharSequence> misc, HttpFlowContext context, HttpRequest request) { - ParametersBag params = context.scopes().getSession(request, false); - String username = params == null ? null : (String) params.get(PARAM_USERNAME); - - String sessionId = (String) request.getTag(TAG_SESSION_ID); - if (username != null || sessionId != null) { - if (username != null) { - misc.put("username", username); - } - - if (sessionId != null) { - misc.put("sessionId", sessionId); - } - } - } - - @Override - public void responseReceived(HttpRequest request, HttpResponse response, HttpFlowContext context) { - if (request == null && response == null) { - return; - } - - SessionInfo session = context.sessionInfo(); - SocketAddress localAddress = context.channelContext.getLocalAddress(); - SocketAddress remoteAddress = context.channelContext.getRemoteAddress(); - - HttpRequestResponseMetric metric = new HttpRequestResponseMetric(); - - metric.setClientIp(localAddress.getIp().toString()); - metric.setClientPort(localAddress.getPort()); - metric.setServerIp(remoteAddress.getIp().toString()); - metric.setServerPort(remoteAddress.getPort()); - metric.setOrigClientIp(session.getSrcIp().toString()); - metric.setOrigClientPort(session.getSrcPort()); - metric.setOrigServerIp(session.getDstIp().toString()); - metric.setOrigServerPort(session.getDstPort()); - - if (request != null) { - metric.setReqId(request.getId()); - populateHeaders(metric.getReqHdrs(), ALLOWED_REQ_HEADERS, request); - populateMisc(metric.getMisc(), context, request); - - metric.setReqHdrSize((Long) request.getTag(TAG_HEADER_SIZE)); - metric.setReqCntSize((Long) request.getTag(TAG_CONTENT_SIZE)); - metric.setReqStart((Long) request.getTag(TAG_TIME_START)); - metric.setReqStop((Long) request.getTag(TAG_TIME_END)); - - metric.setMethod(request.getMethod().toString()); - metric.setUrl(request.getUrl().toString()); - metric.setReqVersion(request.getVersion().toString()); - } - - if (response != null) { - populateHeaders(metric.getRespHdrs(), ALLOWED_RESP_HEADERS, response); - - metric.setRespHdrSize((Long) response.getTag(TAG_HEADER_SIZE)); - metric.setRespCntSize((Long) response.getTag(TAG_CONTENT_SIZE)); - metric.setRespStart((Long) response.getTag(TAG_TIME_START)); - metric.setRespStop((Long) response.getTag(TAG_TIME_END)); - - metric.setCode(response.getStatus().getCode()); - metric.setReason(response.getStatus().getReasonPhrase().toString()); - metric.setRespVersion(response.getVersion().toString()); - } - - reporterClient.send(metric); - } - -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpReporterMetricHandler.java Thu Sep 21 15:21:35 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,24 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.commons.metric.MetricsCollection; -import com.passus.st.metric.MetricsCollectionHandler; -import com.passus.st.reporter.ReporterClient; - -/** - * - * @author mikolaj.podbielski - */ -public class HttpReporterMetricHandler implements MetricsCollectionHandler { - - private final ReporterClient reporterClient; - - public HttpReporterMetricHandler(ReporterClient reporterClient) { - this.reporterClient = reporterClient; - } - - @Override - public void handle(MetricsCollection mc) { - reporterClient.send(mc); - } - -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/ReporterWrapper.java Thu Sep 21 15:41:50 2017 +0200 @@ -0,0 +1,134 @@ +package com.passus.st.client.http; + +import com.passus.commons.metric.MapMetric; +import com.passus.commons.metric.MetricsCollection; +import com.passus.data.ByteString; +import com.passus.net.SocketAddress; +import com.passus.net.http.HttpHeaders; +import com.passus.net.http.HttpMessage; +import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpResponse; +import com.passus.st.ParametersBag; +import static com.passus.st.client.http.HttpConsts.PARAM_USERNAME; +import static com.passus.st.client.http.HttpConsts.TAG_CONTENT_SIZE; +import static com.passus.st.client.http.HttpConsts.TAG_HEADER_SIZE; +import static com.passus.st.client.http.HttpConsts.TAG_SESSION_ID; +import static com.passus.st.client.http.HttpConsts.TAG_TIME_END; +import static com.passus.st.client.http.HttpConsts.TAG_TIME_START; +import com.passus.st.client.http.filter.HttpSequenceListener; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.metric.MetricsCollectionHandler; +import com.passus.st.reporter.ReporterClient; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * + * @author Mirosław Hawrot + */ +public class ReporterWrapper implements HttpClientListener, MetricsCollectionHandler, HttpSequenceListener { + + private static final Set<ByteString> ALLOWED_REQ_HEADERS = new HashSet<>(); + private static final Set<ByteString> ALLOWED_RESP_HEADERS = new HashSet<>(); + + private final ReporterClient reporterClient; + + public ReporterWrapper(ReporterClient reporterClient) { + this.reporterClient = reporterClient; + } + + static { + ALLOWED_REQ_HEADERS.add(HttpHeaders.USER_AGENT); + ALLOWED_REQ_HEADERS.add(HttpHeaders.CONTENT_TYPE); + ALLOWED_RESP_HEADERS.add(HttpHeaders.CONTENT_TYPE); + } + + private void populateHeaders(Map<CharSequence, CharSequence> dst, Set<ByteString> allowedHeaders, HttpMessage message) { + HttpHeaders headers = message.getHeaders(); + for (ByteString name : allowedHeaders) { + ByteString value = headers.get(name); + if (value != null) { + dst.put(name, value); + } + } + } + + private void populateMisc(Map<CharSequence, CharSequence> misc, HttpFlowContext context, HttpRequest request) { + ParametersBag params = context.scopes().getSession(request, false); + String username = params == null ? null : (String) params.get(PARAM_USERNAME); + + String sessionId = (String) request.getTag(TAG_SESSION_ID); + if (username != null || sessionId != null) { + if (username != null) { + misc.put("username", username); + } + + if (sessionId != null) { + misc.put("sessionId", sessionId); + } + } + } + + @Override + public void responseReceived(HttpRequest request, HttpResponse response, HttpFlowContext context) { + if (request == null && response == null) { + return; + } + + SessionInfo session = context.sessionInfo(); + SocketAddress localAddress = context.channelContext.getLocalAddress(); + SocketAddress remoteAddress = context.channelContext.getRemoteAddress(); + + HttpRequestResponseMetric metric = new HttpRequestResponseMetric(); + + metric.setClientIp(localAddress.getIp().toString()); + metric.setClientPort(localAddress.getPort()); + metric.setServerIp(remoteAddress.getIp().toString()); + metric.setServerPort(remoteAddress.getPort()); + metric.setOrigClientIp(session.getSrcIp().toString()); + metric.setOrigClientPort(session.getSrcPort()); + metric.setOrigServerIp(session.getDstIp().toString()); + metric.setOrigServerPort(session.getDstPort()); + + if (request != null) { + metric.setReqId(request.getId()); + populateHeaders(metric.getReqHdrs(), ALLOWED_REQ_HEADERS, request); + populateMisc(metric.getMisc(), context, request); + + metric.setReqHdrSize((Long) request.getTag(TAG_HEADER_SIZE)); + metric.setReqCntSize((Long) request.getTag(TAG_CONTENT_SIZE)); + metric.setReqStart((Long) request.getTag(TAG_TIME_START)); + metric.setReqStop((Long) request.getTag(TAG_TIME_END)); + + metric.setMethod(request.getMethod().toString()); + metric.setUrl(request.getUrl().toString()); + metric.setReqVersion(request.getVersion().toString()); + } + + if (response != null) { + populateHeaders(metric.getRespHdrs(), ALLOWED_RESP_HEADERS, response); + + metric.setRespHdrSize((Long) response.getTag(TAG_HEADER_SIZE)); + metric.setRespCntSize((Long) response.getTag(TAG_CONTENT_SIZE)); + metric.setRespStart((Long) response.getTag(TAG_TIME_START)); + metric.setRespStop((Long) response.getTag(TAG_TIME_END)); + + metric.setCode(response.getStatus().getCode()); + metric.setReason(response.getStatus().getReasonPhrase().toString()); + metric.setRespVersion(response.getVersion().toString()); + } + + reporterClient.send(metric); + } + + @Override + public void handle(MetricsCollection mc) { + reporterClient.send(mc); + } + + @Override + public void sequenceDetected(MapMetric sequence) { + reporterClient.send(sequence); + } +}
--- a/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpSequenceEvent.java Thu Sep 21 15:21:35 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,20 +0,0 @@ -package com.passus.st.client.http.filter; - -import java.util.Map; - -/** - * - * @author mikolaj.podbielski - */ -public class HttpSequenceEvent { - - private final Map<String, Object> values; - - public HttpSequenceEvent(Map<String, Object> values) { - this.values = values; - } - - public Map<String, Object> getValues() { - return values; - } -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpSequenceFilter.java Thu Sep 21 15:21:35 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpSequenceFilter.java Thu Sep 21 15:41:50 2017 +0200 @@ -1,6 +1,8 @@ package com.passus.st.client.http.filter; import com.passus.commons.annotations.Plugin; +import com.passus.commons.metric.MapMetric; +import com.passus.commons.service.Registry; import com.passus.config.CMapNode; import com.passus.config.CNode; import com.passus.config.CTupleNode; @@ -23,7 +25,9 @@ import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpResponse; import com.passus.st.client.http.HttpFlowContext; +import com.passus.st.client.http.ReporterWrapper; import com.passus.st.plugin.PluginConstants; +import java.io.Serializable; import java.text.ParseException; import java.util.ArrayList; import java.util.Collections; @@ -324,7 +328,7 @@ } private void fireEvent(SeqChain chain) { - Map<String, Object> result; + Map<String, Serializable> result; if (values.isEmpty()) { result = Collections.emptyMap(); @@ -333,11 +337,13 @@ Map<String, Object> persisted = chain.valueMap; for (Map.Entry<String, ValueExtractor> e : values.entrySet()) { Object value = e.getValue().extract(persisted); - result.put(e.getKey(), value); + if (value instanceof Serializable) { + result.put(e.getKey(), (Serializable) value); + } } } - listener.sequenceDetected(new HttpSequenceEvent(result)); + listener.sequenceDetected(new MapMetric("sequence", result)); } @Override @@ -371,6 +377,11 @@ } values = (Map<String, ValueExtractor>) config.get("values", Collections.EMPTY_MAP); + + ReporterWrapper reporterWrapper = Registry.getInstance().get("reporterWrapper", ReporterWrapper.class); + if (reporterWrapper != null) { + listener = reporterWrapper; + } } public static class NodeDefCreator implements NodeDefinitionCreator {
--- a/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpSequenceListener.java Thu Sep 21 15:21:35 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpSequenceListener.java Thu Sep 21 15:41:50 2017 +0200 @@ -1,10 +1,12 @@ package com.passus.st.client.http.filter; +import com.passus.commons.metric.MapMetric; + /** * * @author mikolaj.podbielski */ public interface HttpSequenceListener { - public void sequenceDetected(HttpSequenceEvent event); + public void sequenceDetected(MapMetric sequence); }
--- a/stress-tester/src/test/java/com/passus/st/client/http/filter/HttpSequenceFilterTest.java Thu Sep 21 15:21:35 2017 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/http/filter/HttpSequenceFilterTest.java Thu Sep 21 15:41:50 2017 +0200 @@ -1,5 +1,6 @@ package com.passus.st.client.http.filter; +import com.passus.commons.metric.MapMetric; import com.passus.commons.utils.ResourceUtils; import com.passus.config.validation.Errors; import com.passus.filter.AndPredicate; @@ -17,6 +18,7 @@ import com.passus.st.AppUtils; import com.passus.st.client.http.filter.HttpSequenceFilter.SequenceItem; import java.io.File; +import java.io.Serializable; import java.nio.file.Files; import java.nio.file.Paths; import java.text.ParseException; @@ -78,7 +80,7 @@ filter.filterInbound(req1, resp1, null); assertEquals(1, listener.events.size()); - Map<String, Object> extracted = listener.events.get(0).getValues(); + Map<String, Serializable> extracted = listener.events.get(0).getAttributesValue(); assertEquals(200, extracted.get("code")); assertEquals("example.com", extracted.get("xhost").toString()); assertEquals("abc", extracted.get("header").toString()); @@ -137,11 +139,11 @@ public static class TestHttpSequenceListener implements HttpSequenceListener { - ArrayList<HttpSequenceEvent> events = new ArrayList<>(); + ArrayList<MapMetric> events = new ArrayList<>(); @Override - public void sequenceDetected(HttpSequenceEvent event) { - events.add(event); + public void sequenceDetected(MapMetric sequence) { + events.add(sequence); } public void reset() {