Mercurial > stress-tester
changeset 874:a4f8de13c0c9
GlobalConfigMain in progress
author | Devel 2 |
---|---|
date | Fri, 26 Jan 2018 13:21:08 +0100 |
parents | c2bbb7c5b635 |
children | 1bb2a1f9b982 |
files | stress-tester/src/main/java/com/passus/st/GlobalConfigMain.java stress-tester/src/main/java/com/passus/st/client/Client.java stress-tester/src/main/java/com/passus/st/job/TestJob.java stress-tester/src/main/java/com/passus/st/job/TestJobBuilder.java stress-tester/src/main/java/com/passus/st/metric/MetricsCollector.java stress-tester/src/main/java/com/passus/st/metric/ScheduledMetricsCollector.java |
diffstat | 6 files changed, 249 insertions(+), 2 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/GlobalConfigMain.java Fri Jan 26 13:21:08 2018 +0100 @@ -0,0 +1,71 @@ +package com.passus.st; + +import com.passus.config.Configuration; +import com.passus.config.YamlConfigurationReader; +import com.passus.config.validation.Errors; +import com.passus.st.job.TestJob; +import org.apache.commons.cli.*; + +import java.io.File; + +public class GlobalConfigMain { + + private final CliHelper cliHelper = new CliHelper(); + + private Options createOptions() { + CliOptions options = cliHelper.options(); + options.addLogLevelOption(); + return options; + } + + static void printHelp(Options options) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("[options] <config file>", "description", options, ""); + } + + private void start(String[] args) { + AppUtils.registerAll(); + Options options = createOptions(); + + try { + CommandLine cl = new DefaultParser().parse(options, args); + String[] clArgs = cl.getArgs(); + if (clArgs.length < 1) { + System.err.println("Configuration file required."); + printHelp(options); + return; + } + + cliHelper.configureLogger(cl); + Errors errors = new Errors(); + File configFile = new File(clArgs[0]); + Configuration config = YamlConfigurationReader.readFromFile(configFile); + + TestJob testJob = TestJob.create(config, errors); + if (errors.hasError()) { + StringBuilder sb = new StringBuilder(); + + errors.getAllErrors().forEach(error -> { + sb.append(error.getMessage()).append("\n"); + }); + + cliHelper.printError(sb.toString()); + } + + testJob.start(); + + } catch (ParseException e) { + System.out.println(e.getMessage()); + printHelp(options); + } catch (Exception e) { + e.printStackTrace(System.err); + } finally { + AppUtils.unregisterAll(); + } + } + + public static void main(String[] args) { + new GlobalConfigMain().start(args); + } + +}
--- a/stress-tester/src/main/java/com/passus/st/client/Client.java Fri Jan 26 13:20:15 2018 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/Client.java Fri Jan 26 13:21:08 2018 +0100 @@ -2,12 +2,17 @@ import com.passus.commons.service.Service; import com.passus.config.Configurable; +import com.passus.st.emitter.Emitter; import com.passus.st.metric.MetricSource; /** - * * @author Mirosław Hawrot */ public interface Client extends EventHandler, MetricSource, Service, Configurable { - + + public Emitter getEmitter(); + + public void setEmitter(Emitter emitter); + + }
--- a/stress-tester/src/main/java/com/passus/st/job/TestJob.java Fri Jan 26 13:20:15 2018 +0100 +++ b/stress-tester/src/main/java/com/passus/st/job/TestJob.java Fri Jan 26 13:21:08 2018 +0100 @@ -1,15 +1,30 @@ package com.passus.st.job; +import com.passus.commons.Assert; +import com.passus.config.Configuration; +import com.passus.config.ConfigurationContext; +import com.passus.config.validation.Errors; import com.passus.st.client.Client; +import com.passus.st.client.http.HttpClient; +import com.passus.st.client.http.HttpClientListener; import com.passus.st.client.http.ReporterDestination; +import com.passus.st.config.TestJobConfigurator; import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.PassThroughSessionMapper; +import com.passus.st.emitter.nio.NioEmitter; +import com.passus.st.metric.MetricSource; +import com.passus.st.metric.MetricsCollector; +import com.passus.st.metric.ScheduledMetricsCollector; import com.passus.st.source.EventSource; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import static com.passus.st.utils.ConfigurationContextConsts.*; + /** * @author Mirosław Hawrot */ @@ -25,11 +40,97 @@ private List<ReporterDestination> reporterDestinations = new ArrayList<>(); + private Emitter defaultEmitter; + + private List<HttpClientListener> httpClientListener; + + private MetricsCollector metricsCollector = new ScheduledMetricsCollector(); + public TestJob() { } + public List<EventSource> getEventSources() { + return eventSources; + } + + public void setEventSources(List<EventSource> eventSources) { + Assert.notContainsNull(eventSources, "eventSources"); + this.eventSources.clear(); + this.eventSources.addAll(eventSources); + } + + public List<Emitter> getEmitters() { + return emitters; + } + + public void setEmitters(List<Emitter> emitters) { + Assert.notContainsNull(emitters, "emitters"); + this.emitters.clear(); + this.emitters.addAll(emitters); + } + + public Emitter getDefaultEmitter() { + return defaultEmitter; + } + + public void setDefaultEmitter(Emitter defaultEmitter) { + this.defaultEmitter = defaultEmitter; + } + + public MetricsCollector getMetricsCollector() { + return metricsCollector; + } + + public void setMetricsCollector(MetricsCollector metricsCollector) { + this.metricsCollector = metricsCollector; + } + + private void registerMetricSource(MetricSource metricSource) { + if (metricsCollector != null) { + metricsCollector.register(metricSource); + } + } + + private void prepare() { + LOGGER.debug("Preparing job..."); + if (defaultEmitter == null) { + if (emitters.isEmpty()) { + defaultEmitter = new NioEmitter(); + defaultEmitter.setSessionMapper(new PassThroughSessionMapper()); + emitters.add(defaultEmitter); + } else { + defaultEmitter = emitters.get(0); + } + } + + emitters.forEach(emitter -> { + if (emitter.isCollectMetrics()) { + registerMetricSource(emitter); + } + }); + + clients.forEach(client -> { + if (client.getEmitter() == null) { + client.setEmitter(defaultEmitter); + } + + if (httpClientListener != null + && !httpClientListener.isEmpty() + && client instanceof HttpClient) { + httpClientListener.forEach(((HttpClient) client)::addListener); + } + + if (client.isCollectMetrics()) { + registerMetricSource(client); + } + }); + } + @Override public void start() { + prepare(); + metricsCollector.start(); + emitters.forEach(Emitter::start); clients.forEach(Client::start); } @@ -37,6 +138,45 @@ public void stop() { clients.forEach(Client::stop); emitters.forEach(Emitter::stop); + + reporterDestinations.forEach(ReporterDestination::stop); + + if (metricsCollector != null) { + metricsCollector.flush(true); + } } + private static <T> void populateIfNotNull(Collection<T> srcCollection, Collection<T> dstCollection) { + if (srcCollection != null) { + dstCollection.addAll(srcCollection); + } + } + + @SuppressWarnings("unchecked") + public static TestJob create(Configuration config, Errors errors) { + Assert.notNull(config, "config"); + TestJobConfigurator configurator = new TestJobConfigurator(); + ConfigurationContext context = ConfigurationContext.create(); + configurator.configure(config, errors, context); + if (errors.hasError()) { + return null; + } + + return create(context); + } + + public static TestJob create(ConfigurationContext context) { + Assert.notNull(context, "context"); + TestJob testJob = new TestJob(); + List<EventSource> cfgEventSources = (List<EventSource>) context.get(EVENT_SOURCE_SOURCES); + populateIfNotNull(cfgEventSources, testJob.eventSources); + + List<Client> cfgClients = (List<Client>) context.get(CLIENT_CLIENTS); + populateIfNotNull(cfgClients, testJob.clients); + + List<Emitter> cfgEmitters = (List<Emitter>) context.get(EMITTER_EMITTERS); + populateIfNotNull(cfgEmitters, testJob.emitters); + testJob.defaultEmitter = (Emitter) context.get(EMITTER_DEFAULT_EMITTER); + return testJob; + } }
--- a/stress-tester/src/main/java/com/passus/st/job/TestJobBuilder.java Fri Jan 26 13:20:15 2018 +0100 +++ b/stress-tester/src/main/java/com/passus/st/job/TestJobBuilder.java Fri Jan 26 13:21:08 2018 +0100 @@ -6,6 +6,7 @@ import com.passus.st.Builder; import com.passus.st.client.Client; import com.passus.st.client.http.HttpClient; +import com.passus.st.client.http.HttpClientListener; import com.passus.st.client.http.ReporterDestination; import com.passus.st.config.TestJobConfigurator; import com.passus.st.emitter.Emitter; @@ -39,6 +40,10 @@ private final List<ReporterDestination> reporterDestinations = new ArrayList<>(); + private Emitter defaultEmitter; + + private List<HttpClientListener> httpClientListener; + private MetricsCollector metricsCollector = new ScheduledMetricsCollector(); public static NcEventSource createNcEventSource() { @@ -165,6 +170,8 @@ List<Emitter> cfgEmitters = (List<Emitter>) context.get(EMITTER_EMITTERS); populateIfNotNull(cfgEmitters, emitters); + defaultEmitter = (Emitter) context.get(EMITTER_DEFAULT_EMITTER); + } public static TestJobBuilder get() { @@ -185,6 +192,27 @@ private TestJob createTestJob() { TestJob job = new TestJob(); + + if (defaultEmitter == null) { + if (emitters.isEmpty()) { + defaultEmitter = new NioEmitter(); + } else { + defaultEmitter = new NioEmitter(); + } + } + + clients.forEach((client) -> { + if (client.getEmitter() == null) { + client.setEmitter(defaultEmitter); + } + + if (httpClientListener != null + && !httpClientListener.isEmpty() + && client instanceof HttpClient) { + httpClientListener.forEach(((HttpClient) client)::addListener); + } + }); + return job; }
--- a/stress-tester/src/main/java/com/passus/st/metric/MetricsCollector.java Fri Jan 26 13:20:15 2018 +0100 +++ b/stress-tester/src/main/java/com/passus/st/metric/MetricsCollector.java Fri Jan 26 13:21:08 2018 +0100 @@ -13,6 +13,8 @@ public void unregister(MetricSource source); + public void flush(boolean force); + public void collect(); public List<MetricsCollectionHandler> getHandlers();
--- a/stress-tester/src/main/java/com/passus/st/metric/ScheduledMetricsCollector.java Fri Jan 26 13:20:15 2018 +0100 +++ b/stress-tester/src/main/java/com/passus/st/metric/ScheduledMetricsCollector.java Fri Jan 26 13:21:08 2018 +0100 @@ -139,6 +139,7 @@ } } + @Override public void flush(boolean force) { long now = System.currentTimeMillis(); ArrayDeque<MetricsBatchTimeWindowRange> expired;