changeset 874:a4f8de13c0c9

GlobalConfigMain in progress
author Devel 2
date Fri, 26 Jan 2018 13:21:08 +0100
parents c2bbb7c5b635
children 1bb2a1f9b982
files stress-tester/src/main/java/com/passus/st/GlobalConfigMain.java stress-tester/src/main/java/com/passus/st/client/Client.java stress-tester/src/main/java/com/passus/st/job/TestJob.java stress-tester/src/main/java/com/passus/st/job/TestJobBuilder.java stress-tester/src/main/java/com/passus/st/metric/MetricsCollector.java stress-tester/src/main/java/com/passus/st/metric/ScheduledMetricsCollector.java
diffstat 6 files changed, 249 insertions(+), 2 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/GlobalConfigMain.java	Fri Jan 26 13:21:08 2018 +0100
@@ -0,0 +1,71 @@
+package com.passus.st;
+
+import com.passus.config.Configuration;
+import com.passus.config.YamlConfigurationReader;
+import com.passus.config.validation.Errors;
+import com.passus.st.job.TestJob;
+import org.apache.commons.cli.*;
+
+import java.io.File;
+
+public class GlobalConfigMain {
+
+    private final CliHelper cliHelper = new CliHelper();
+
+    private Options createOptions() {
+        CliOptions options = cliHelper.options();
+        options.addLogLevelOption();
+        return options;
+    }
+
+    static void printHelp(Options options) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("[options] <config file>", "description", options, "");
+    }
+
+    private void start(String[] args) {
+        AppUtils.registerAll();
+        Options options = createOptions();
+
+        try {
+            CommandLine cl = new DefaultParser().parse(options, args);
+            String[] clArgs = cl.getArgs();
+            if (clArgs.length < 1) {
+                System.err.println("Configuration file required.");
+                printHelp(options);
+                return;
+            }
+
+            cliHelper.configureLogger(cl);
+            Errors errors = new Errors();
+            File configFile = new File(clArgs[0]);
+            Configuration config = YamlConfigurationReader.readFromFile(configFile);
+
+            TestJob testJob = TestJob.create(config, errors);
+            if (errors.hasError()) {
+                StringBuilder sb = new StringBuilder();
+
+                errors.getAllErrors().forEach(error -> {
+                    sb.append(error.getMessage()).append("\n");
+                });
+
+                cliHelper.printError(sb.toString());
+            }
+
+            testJob.start();
+
+        } catch (ParseException e) {
+            System.out.println(e.getMessage());
+            printHelp(options);
+        } catch (Exception e) {
+            e.printStackTrace(System.err);
+        } finally {
+            AppUtils.unregisterAll();
+        }
+    }
+
+    public static void main(String[] args) {
+        new GlobalConfigMain().start(args);
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/client/Client.java	Fri Jan 26 13:20:15 2018 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/Client.java	Fri Jan 26 13:21:08 2018 +0100
@@ -2,12 +2,17 @@
 
 import com.passus.commons.service.Service;
 import com.passus.config.Configurable;
+import com.passus.st.emitter.Emitter;
 import com.passus.st.metric.MetricSource;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public interface Client extends EventHandler, MetricSource, Service, Configurable {
-    
+
+    public Emitter getEmitter();
+
+    public void setEmitter(Emitter emitter);
+
+
 }
--- a/stress-tester/src/main/java/com/passus/st/job/TestJob.java	Fri Jan 26 13:20:15 2018 +0100
+++ b/stress-tester/src/main/java/com/passus/st/job/TestJob.java	Fri Jan 26 13:21:08 2018 +0100
@@ -1,15 +1,30 @@
 package com.passus.st.job;
 
+import com.passus.commons.Assert;
+import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
+import com.passus.config.validation.Errors;
 import com.passus.st.client.Client;
+import com.passus.st.client.http.HttpClient;
+import com.passus.st.client.http.HttpClientListener;
 import com.passus.st.client.http.ReporterDestination;
+import com.passus.st.config.TestJobConfigurator;
 import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.PassThroughSessionMapper;
+import com.passus.st.emitter.nio.NioEmitter;
+import com.passus.st.metric.MetricSource;
+import com.passus.st.metric.MetricsCollector;
+import com.passus.st.metric.ScheduledMetricsCollector;
 import com.passus.st.source.EventSource;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
+import static com.passus.st.utils.ConfigurationContextConsts.*;
+
 /**
  * @author Mirosław Hawrot
  */
@@ -25,11 +40,97 @@
 
     private List<ReporterDestination> reporterDestinations = new ArrayList<>();
 
+    private Emitter defaultEmitter;
+
+    private List<HttpClientListener> httpClientListener;
+
+    private MetricsCollector metricsCollector = new ScheduledMetricsCollector();
+
     public TestJob() {
     }
 
+    public List<EventSource> getEventSources() {
+        return eventSources;
+    }
+
+    public void setEventSources(List<EventSource> eventSources) {
+        Assert.notContainsNull(eventSources, "eventSources");
+        this.eventSources.clear();
+        this.eventSources.addAll(eventSources);
+    }
+
+    public List<Emitter> getEmitters() {
+        return emitters;
+    }
+
+    public void setEmitters(List<Emitter> emitters) {
+        Assert.notContainsNull(emitters, "emitters");
+        this.emitters.clear();
+        this.emitters.addAll(emitters);
+    }
+
+    public Emitter getDefaultEmitter() {
+        return defaultEmitter;
+    }
+
+    public void setDefaultEmitter(Emitter defaultEmitter) {
+        this.defaultEmitter = defaultEmitter;
+    }
+
+    public MetricsCollector getMetricsCollector() {
+        return metricsCollector;
+    }
+
+    public void setMetricsCollector(MetricsCollector metricsCollector) {
+        this.metricsCollector = metricsCollector;
+    }
+
+    private void registerMetricSource(MetricSource metricSource) {
+        if (metricsCollector != null) {
+            metricsCollector.register(metricSource);
+        }
+    }
+
+    private void prepare() {
+        LOGGER.debug("Preparing job...");
+        if (defaultEmitter == null) {
+            if (emitters.isEmpty()) {
+                defaultEmitter = new NioEmitter();
+                defaultEmitter.setSessionMapper(new PassThroughSessionMapper());
+                emitters.add(defaultEmitter);
+            } else {
+                defaultEmitter = emitters.get(0);
+            }
+        }
+
+        emitters.forEach(emitter -> {
+            if (emitter.isCollectMetrics()) {
+                registerMetricSource(emitter);
+            }
+        });
+
+        clients.forEach(client -> {
+            if (client.getEmitter() == null) {
+                client.setEmitter(defaultEmitter);
+            }
+
+            if (httpClientListener != null
+                    && !httpClientListener.isEmpty()
+                    && client instanceof HttpClient) {
+                httpClientListener.forEach(((HttpClient) client)::addListener);
+            }
+
+            if (client.isCollectMetrics()) {
+                registerMetricSource(client);
+            }
+        });
+    }
+
     @Override
     public void start() {
+        prepare();
+        metricsCollector.start();
+        emitters.forEach(Emitter::start);
         clients.forEach(Client::start);
     }
 
@@ -37,6 +138,45 @@
     public void stop() {
         clients.forEach(Client::stop);
         emitters.forEach(Emitter::stop);
+
+        reporterDestinations.forEach(ReporterDestination::stop);
+
+        if (metricsCollector != null) {
+            metricsCollector.flush(true);
+        }
     }
 
+    private static <T> void populateIfNotNull(Collection<T> srcCollection, Collection<T> dstCollection) {
+        if (srcCollection != null) {
+            dstCollection.addAll(srcCollection);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public static TestJob create(Configuration config, Errors errors) {
+        Assert.notNull(config, "config");
+        TestJobConfigurator configurator = new TestJobConfigurator();
+        ConfigurationContext context = ConfigurationContext.create();
+        configurator.configure(config, errors, context);
+        if (errors.hasError()) {
+            return null;
+        }
+
+        return create(context);
+    }
+
+    public static TestJob create(ConfigurationContext context) {
+        Assert.notNull(context, "context");
+        TestJob testJob = new TestJob();
+        List<EventSource> cfgEventSources = (List<EventSource>) context.get(EVENT_SOURCE_SOURCES);
+        populateIfNotNull(cfgEventSources, testJob.eventSources);
+
+        List<Client> cfgClients = (List<Client>) context.get(CLIENT_CLIENTS);
+        populateIfNotNull(cfgClients, testJob.clients);
+
+        List<Emitter> cfgEmitters = (List<Emitter>) context.get(EMITTER_EMITTERS);
+        populateIfNotNull(cfgEmitters, testJob.emitters);
+        testJob.defaultEmitter = (Emitter) context.get(EMITTER_DEFAULT_EMITTER);
+        return testJob;
+    }
 }
--- a/stress-tester/src/main/java/com/passus/st/job/TestJobBuilder.java	Fri Jan 26 13:20:15 2018 +0100
+++ b/stress-tester/src/main/java/com/passus/st/job/TestJobBuilder.java	Fri Jan 26 13:21:08 2018 +0100
@@ -6,6 +6,7 @@
 import com.passus.st.Builder;
 import com.passus.st.client.Client;
 import com.passus.st.client.http.HttpClient;
+import com.passus.st.client.http.HttpClientListener;
 import com.passus.st.client.http.ReporterDestination;
 import com.passus.st.config.TestJobConfigurator;
 import com.passus.st.emitter.Emitter;
@@ -39,6 +40,10 @@
 
     private final List<ReporterDestination> reporterDestinations = new ArrayList<>();
 
+    private Emitter defaultEmitter;
+
+    private List<HttpClientListener> httpClientListener;
+
     private MetricsCollector metricsCollector = new ScheduledMetricsCollector();
 
     public static NcEventSource createNcEventSource() {
@@ -165,6 +170,8 @@
 
         List<Emitter> cfgEmitters = (List<Emitter>) context.get(EMITTER_EMITTERS);
         populateIfNotNull(cfgEmitters, emitters);
+        defaultEmitter = (Emitter) context.get(EMITTER_DEFAULT_EMITTER);
+
     }
 
     public static TestJobBuilder get() {
@@ -185,6 +192,27 @@
 
     private TestJob createTestJob() {
         TestJob job = new TestJob();
+
+        if (defaultEmitter == null) {
+            if (emitters.isEmpty()) {
+                defaultEmitter = new NioEmitter();
+            } else {
+                defaultEmitter = new NioEmitter();
+            }
+        }
+
+        clients.forEach((client) -> {
+            if (client.getEmitter() == null) {
+                client.setEmitter(defaultEmitter);
+            }
+
+            if (httpClientListener != null
+                    && !httpClientListener.isEmpty()
+                    && client instanceof HttpClient) {
+                httpClientListener.forEach(((HttpClient) client)::addListener);
+            }
+        });
+
         return job;
     }
 
--- a/stress-tester/src/main/java/com/passus/st/metric/MetricsCollector.java	Fri Jan 26 13:20:15 2018 +0100
+++ b/stress-tester/src/main/java/com/passus/st/metric/MetricsCollector.java	Fri Jan 26 13:21:08 2018 +0100
@@ -13,6 +13,8 @@
 
     public void unregister(MetricSource source);
 
+    public void flush(boolean force);
+
     public void collect();
 
     public List<MetricsCollectionHandler> getHandlers();
--- a/stress-tester/src/main/java/com/passus/st/metric/ScheduledMetricsCollector.java	Fri Jan 26 13:20:15 2018 +0100
+++ b/stress-tester/src/main/java/com/passus/st/metric/ScheduledMetricsCollector.java	Fri Jan 26 13:21:08 2018 +0100
@@ -139,6 +139,7 @@
         }
     }
 
+    @Override
     public void flush(boolean force) {
         long now = System.currentTimeMillis();
         ArrayDeque<MetricsBatchTimeWindowRange> expired;