changeset 1126:ea20a5ad6d38

Scanner in progress
author Devel 2
date Fri, 05 Jun 2020 12:24:20 +0200
parents 4c188c94dff0
children ae2d0e7c5f5b
files stress-tester/src/main/java/com/passus/st/metric/MetricUtils.java stress-tester/src/main/java/com/passus/st/metric/ScheduledMetricsCollector.java stress-tester/src/main/java/com/passus/st/metric/SimpleMetricCollector.java stress-tester/src/main/java/com/passus/st/plugin/PluginConstants.java stress-tester/src/main/java/com/passus/st/scanner/HttpScannerPayloadAnalyzer.java stress-tester/src/main/java/com/passus/st/scanner/Scanner.java stress-tester/src/main/java/com/passus/st/scanner/ScannerAnalyzer.java stress-tester/src/main/java/com/passus/st/scanner/ScannerConfiguratorPluginFactory.java stress-tester/src/main/java/com/passus/st/scanner/ScannerEventHandler.java stress-tester/src/main/java/com/passus/st/scanner/ScannerJob.java stress-tester/src/main/java/com/passus/st/scanner/ScannerPayloadAnalyzer.java stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java
diffstat 12 files changed, 541 insertions(+), 5 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/metric/MetricUtils.java	Fri Jun 05 12:24:20 2020 +0200
@@ -0,0 +1,43 @@
+package com.passus.st.metric;
+
+import com.passus.commons.ConversionException;
+import com.passus.commons.metric.Metric;
+import com.passus.st.utils.PeriodFormatter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public class MetricUtils {
+
+    private MetricUtils() {
+    }
+
+    public static void printMetrics(List<Metric> metrics, long startTime) {
+        if (startTime == 0) {
+            return;
+        }
+
+        long endTime = System.currentTimeMillis();
+        synchronized (System.out) {
+            System.out.println("");
+            try {
+                System.out.println("Elapsed time: " + PeriodFormatter.INSTANCE.reverseTransform(endTime - startTime) + ".");
+            } catch (ConversionException ignored) {
+            }
+            System.out.println("Metrics:");
+
+            String line = "%24s: %s\n";
+            for (Metric metric : metrics) {
+                System.out.println(metric.getName() + ":");
+                Map<String, Serializable> valuesMap = metric.getAttributesValue();
+                for (Map.Entry<String, Serializable> entry : valuesMap.entrySet()) {
+                    String key = entry.getKey();
+                    Serializable value = entry.getValue();
+
+                    System.out.printf(line, key, value);
+                }
+            }
+        }
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/metric/ScheduledMetricsCollector.java	Fri Jun 05 09:47:19 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/metric/ScheduledMetricsCollector.java	Fri Jun 05 12:24:20 2020 +0200
@@ -6,16 +6,16 @@
 import com.passus.commons.time.ScheduledTimerService;
 import com.passus.commons.time.TimeAware;
 import com.passus.commons.time.TimeGenerator;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public class ScheduledMetricsCollector implements MetricsCollector, TimeAware {
@@ -51,7 +51,7 @@
 
     @Override
     public boolean isStarted() {
-        throw new UnsupportedOperationException("Not supported yet.");
+        return started;
     }
 
     @Override
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/metric/SimpleMetricCollector.java	Fri Jun 05 12:24:20 2020 +0200
@@ -0,0 +1,91 @@
+package com.passus.st.metric;
+
+import com.passus.commons.Assert;
+import com.passus.commons.metric.MetricsCollection;
+import com.passus.st.client.PerNameMetricsContainer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class SimpleMetricCollector implements MetricsCollector {
+
+    private List<MetricSource> sources = new ArrayList<>();
+
+    private List<MetricsCollectionHandler> handlers = new ArrayList<>();
+
+    private PerNameMetricsContainer container = new PerNameMetricsContainer();
+
+    public PerNameMetricsContainer getContainer() {
+        return container;
+    }
+
+    @Override
+    public boolean isStarted() {
+        return false;
+    }
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public void register(MetricSource source) {
+        Assert.notNull(source, "source");
+        synchronized (this) {
+            sources.add(source);
+        }
+    }
+
+    @Override
+    public void unregister(MetricSource source) {
+        Assert.notNull(source, "source");
+        synchronized (this) {
+            sources.remove(source);
+        }
+    }
+
+    @Override
+    public List<MetricsCollectionHandler> getHandlers() {
+        return Collections.unmodifiableList(handlers);
+    }
+
+    @Override
+    public void addHandler(MetricsCollectionHandler handler) {
+        Assert.notNull(handler, "handler");
+        handlers.add(handler);
+    }
+
+    @Override
+    public void removeHandler(MetricsCollectionHandler handler) {
+        handlers.remove(handler);
+    }
+
+    @Override
+    public void flush(boolean force) {
+        if (force) {
+            MetricsCollection metricsCollection = new MetricsCollection(0, 0, new ArrayList<>(container.getMetrics()));
+            for (MetricsCollectionHandler handler : handlers) {
+                handler.handle(metricsCollection);
+            }
+        }
+    }
+
+    @Override
+    public void collect() {
+        synchronized (this) {
+            for (MetricSource source : sources) {
+                source.writeMetrics(container);
+            }
+        }
+
+        flush(false);
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/plugin/PluginConstants.java	Fri Jun 05 09:47:19 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/plugin/PluginConstants.java	Fri Jun 05 12:24:20 2020 +0200
@@ -31,6 +31,8 @@
 
     public static final String CATEGORY_GENERATOR = "Generator";
 
+    public static final String CATEGORY_SCANNER_ANALYZER = "ScannerAnalyzer";
+
     private PluginConstants() {
     }
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/scanner/HttpScannerPayloadAnalyzer.java	Fri Jun 05 12:24:20 2020 +0200
@@ -0,0 +1,68 @@
+package com.passus.st.scanner;
+
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpResponse;
+import com.passus.st.client.SessionPayloadEvent;
+import com.passus.st.client.http.HttpMetric;
+import com.passus.st.metric.MetricSource;
+import com.passus.st.metric.MetricsContainer;
+
+import java.io.IOException;
+
+import static com.passus.st.Protocols.HTTP;
+
+public class HttpScannerPayloadAnalyzer extends ScannerPayloadAnalyzer implements MetricSource {
+
+    private HttpMetric metric;
+
+    @Override
+    public void activate() {
+        metric = new HttpMetric();
+        metric.activate();
+    }
+
+    @Override
+    public void deactivate() {
+        metric.deactivate();
+        metric = null;
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return false;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+        container.update(metric);
+    }
+
+    @Override
+    protected void analyzePayload(SessionPayloadEvent event) {
+        if (event.getProtocolId() != HTTP) {
+            return;
+        }
+
+        HttpRequest req = (HttpRequest) event.getRequest();
+        HttpResponse resp = (HttpResponse) event.getResponse();
+
+        if (resp != null) {
+            metric.addResponseStatusCode(resp.getStatus().getCode());
+
+            if (resp.hasContent()) {
+                try {
+                    metric.addResponseSize(resp.getContentByteBuff().length());
+                } catch (IOException ignore) {
+
+                }
+            }
+        }
+    }
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/scanner/Scanner.java	Fri Jun 05 12:24:20 2020 +0200
@@ -0,0 +1,111 @@
+package com.passus.st.scanner;
+
+import com.passus.commons.metric.Metric;
+import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
+import com.passus.config.YamlConfigurationReader;
+import com.passus.config.validation.Errors;
+import com.passus.st.AppUtils;
+import com.passus.st.CliHelper;
+import com.passus.st.CliOptions;
+import com.passus.st.config.TestJobConfigurator;
+import com.passus.st.metric.MetricsCollector;
+import com.passus.st.metric.SummaryMetricsCollectionHandler;
+import org.apache.commons.cli.*;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.File;
+import java.util.List;
+
+import static com.passus.st.metric.MetricUtils.printMetrics;
+
+public class Scanner {
+
+    private final CliHelper cliHelper = new CliHelper();
+
+    private final SummaryMetricsCollectionHandler summMetricsHandler = new SummaryMetricsCollectionHandler();
+
+    private Options createOptions() {
+        CliOptions options = cliHelper.options();
+        options.addLogLevelOption();
+        return options;
+    }
+
+    static void printHelp(Options options) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("[options] <config file>", "description", options, "");
+    }
+
+    List<Metric> getMetrics() {
+        return summMetricsHandler.getMetrics();
+    }
+
+    void start(String... args) {
+        AppUtils.registerAll();
+        Options options = createOptions();
+
+        try {
+            CommandLine cl = new DefaultParser().parse(options, args);
+            String[] clArgs = cl.getArgs();
+            if (clArgs.length < 1) {
+                System.err.println("Configuration file required.");
+                printHelp(options);
+                return;
+            }
+
+            cliHelper.configureLogger(cl);
+            Errors errors = new Errors();
+            File configFile = new File(clArgs[0]);
+            Configuration config = YamlConfigurationReader.readFromFile(configFile);
+
+            ConfigurationContext context = ConfigurationContext.create();
+            ScannerConfiguratorPluginFactory pluginFactory = new ScannerConfiguratorPluginFactory();
+
+            TestJobConfigurator testJobConfigurator = new TestJobConfigurator();
+            testJobConfigurator.setConfiguratorFactory(pluginFactory);
+
+            testJobConfigurator.configure(config, errors, context);
+            if (errors.hasError()) {
+                StringBuilder sb = new StringBuilder();
+
+                errors.getAllErrors().forEach(error -> {
+                    sb.append(error.getMessage()).append("\n");
+                });
+
+                cliHelper.printError(sb.toString());
+            }
+
+            ScannerJob job = ScannerJob.create(context);
+
+            final long startTime = System.currentTimeMillis();
+            MetricsCollector metricsCollector = job.getMetricsCollector();
+
+            if (metricsCollector != null) {
+                metricsCollector.addHandler(summMetricsHandler);
+            }
+
+            final Logger logger = LogManager.getLogger("Scanner");
+            logger.debug("Scanner started");
+            job.start();
+            job.stop();
+
+            testJobConfigurator.destroy(config, errors, context);
+
+            printMetrics(getMetrics(), startTime);
+            logger.debug("Scanner stopped");
+        } catch (ParseException e) {
+            System.out.println(e.getMessage());
+            printHelp(options);
+        } catch (Exception e) {
+            e.printStackTrace(System.err);
+        } finally {
+            AppUtils.unregisterAll();
+        }
+    }
+
+    public static void main(String... args) {
+        new Scanner().start(args);
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/scanner/ScannerAnalyzer.java	Fri Jun 05 12:24:20 2020 +0200
@@ -0,0 +1,17 @@
+package com.passus.st.scanner;
+
+import com.passus.st.client.Event;
+
+public interface ScannerAnalyzer {
+
+    default void activate() {
+
+    }
+
+    default void deactivate() {
+
+    }
+
+    void analyze(Event event);
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/scanner/ScannerConfiguratorPluginFactory.java	Fri Jun 05 12:24:20 2020 +0200
@@ -0,0 +1,13 @@
+package com.passus.st.scanner;
+
+import com.passus.commons.plugin.PluginFactory;
+import com.passus.config.DomainConfigurator;
+import com.passus.st.source.EventSourceConfigurator;
+
+public class ScannerConfiguratorPluginFactory extends PluginFactory<DomainConfigurator> {
+
+    public ScannerConfiguratorPluginFactory() {
+        add(EventSourceConfigurator.DOMAIN, EventSourceConfigurator.class);
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/scanner/ScannerEventHandler.java	Fri Jun 05 12:24:20 2020 +0200
@@ -0,0 +1,59 @@
+package com.passus.st.scanner;
+
+import com.passus.st.client.DataEvents;
+import com.passus.st.client.Event;
+import com.passus.st.client.EventHandler;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+public class ScannerEventHandler extends Thread implements EventHandler {
+
+    private static final Logger LOGGER = LogManager.getLogger(ScannerEventHandler.class);
+
+    private BlockingQueue<Event> queue = new ArrayBlockingQueue<>(100);
+
+    private boolean working = true;
+
+    private ScannerAnalyzer[] analyzers;
+
+    public ScannerEventHandler(Collection<ScannerAnalyzer> analyzers) {
+        this.analyzers = analyzers.toArray(new ScannerAnalyzer[0]);
+    }
+
+    @Override
+    public void handle(Event event) {
+        queue.add(event);
+
+    }
+
+    @Override
+    public void run() {
+        while (working) {
+            Event event = null;
+            try {
+                event = queue.take();
+            } catch (InterruptedException ignore) {
+
+            }
+
+            if (event != null) {
+                for (ScannerAnalyzer analyzer : analyzers) {
+                    try {
+                        analyzer.analyze(event);
+                    } catch (Exception e) {
+                        LOGGER.debug(e.getMessage(), e);
+                    }
+                }
+
+                if (event.getType() == DataEvents.DataEnd.TYPE) {
+                    working = false;
+                    break;
+                }
+            }
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/scanner/ScannerJob.java	Fri Jun 05 12:24:20 2020 +0200
@@ -0,0 +1,115 @@
+package com.passus.st.scanner;
+
+import com.passus.commons.Assert;
+import com.passus.commons.service.Service;
+import com.passus.config.ConfigurationContext;
+import com.passus.st.job.Job;
+import com.passus.st.metric.MetricSource;
+import com.passus.st.metric.MetricsCollectionHandler;
+import com.passus.st.metric.SimpleMetricCollector;
+import com.passus.st.source.EventSource;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static com.passus.st.utils.ConfigurationContextConsts.EVENT_SOURCE_SOURCES;
+
+public class ScannerJob implements Job {
+
+    private static final Logger LOGGER = LogManager.getLogger(ScannerJob.class);
+
+    private ScannerEventHandler eventHandler;
+
+    private List<EventSource> eventSources = new ArrayList<>();
+
+    private SimpleMetricCollector metricsCollector = new SimpleMetricCollector();
+
+    private List<ScannerAnalyzer> analyzers = new ArrayList<>();
+
+    public ScannerJob() {
+        analyzers.add(new HttpScannerPayloadAnalyzer());
+    }
+
+    public List<EventSource> getEventSources() {
+        return eventSources;
+    }
+
+    public void setEventSources(List<EventSource> eventSources) {
+        Assert.notContainsNull(eventSources, "eventSources");
+        this.eventSources.clear();
+        this.eventSources.addAll(eventSources);
+    }
+
+    public SimpleMetricCollector getMetricsCollector() {
+        return metricsCollector;
+    }
+
+    @Override
+    public void start() {
+        analyzers.forEach(a -> {
+            if (a instanceof MetricSource) {
+                metricsCollector.register((MetricSource) a);
+            }
+
+            a.activate();
+        });
+
+        eventHandler = new ScannerEventHandler(analyzers);
+        metricsCollector.start();
+
+        eventHandler.start();
+        eventSources.forEach(src -> {
+            src.setHandler(eventHandler);
+            if (src instanceof MetricSource) {
+                MetricSource es = (MetricSource) src;
+                if (es.isCollectMetrics()) {
+                    metricsCollector.register(es);
+                }
+            }
+            src.start();
+        });
+    }
+
+    @Override
+    public void stop() {
+        try {
+            eventHandler.join();
+        } catch (InterruptedException ignore) {
+
+        }
+        eventSources.forEach(EventSource::stop);
+
+        metricsCollector.collect();
+        metricsCollector.flush(true);
+
+        List<MetricsCollectionHandler> handlers = metricsCollector.getHandlers();
+        if (handlers != null) {
+            for (MetricsCollectionHandler handler : handlers) {
+                if (handler instanceof Service) {
+                    ((Service) handler).stop();
+                }
+            }
+        }
+
+        analyzers.forEach(ScannerAnalyzer::deactivate);
+    }
+
+    private static <T> void populateIfNotNull(Collection<T> srcCollection, Collection<T> dstCollection) {
+        if (srcCollection != null) {
+            dstCollection.addAll(srcCollection);
+        }
+    }
+
+    public static ScannerJob create(ConfigurationContext context) {
+        Assert.notNull(context, "context");
+        ScannerJob job = new ScannerJob();
+
+        List<EventSource> cfgEventSources = context.get(EVENT_SOURCE_SOURCES);
+        populateIfNotNull(cfgEventSources, job.eventSources);
+        return job;
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/scanner/ScannerPayloadAnalyzer.java	Fri Jun 05 12:24:20 2020 +0200
@@ -0,0 +1,17 @@
+package com.passus.st.scanner;
+
+import com.passus.st.client.Event;
+import com.passus.st.client.SessionPayloadEvent;
+
+public abstract class ScannerPayloadAnalyzer implements ScannerAnalyzer {
+
+    @Override
+    public void analyze(Event event) {
+        if (event.getType() == SessionPayloadEvent.TYPE) {
+            analyzePayload((SessionPayloadEvent) event);
+        }
+    }
+
+    protected abstract void analyzePayload(SessionPayloadEvent event);
+
+}
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java	Fri Jun 05 09:47:19 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java	Fri Jun 05 12:24:20 2020 +0200
@@ -527,7 +527,7 @@
 
             return mapDef(
                     tupleDef("fileName", STRING_DEF),
-                    tupleDef("loops", INT_GREATER_EQUAL_ZERO_DEF),
+                    tupleDef("loops", INT_GREATER_EQUAL_ZERO_DEF).setRequired(false),
                     tupleDef("loopDelay", LONG_GREATER_EQUAL_ZERO_DEF).setRequired(false),
                     tupleDef("sessionProc", sessionProcDef).setRequired(false),
                     tupleDef("collectMetrics", BOOLEAN_DEF).setRequired(false),