Mercurial > stress-tester
changeset 818:8abe3a914cc6
Global configuration in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/ConverterHttpClient.java Wed Jan 17 12:14:07 2018 +0100 +++ b/stress-tester/src/main/java/com/passus/st/ConverterHttpClient.java Wed Jan 17 14:47:57 2018 +0100 @@ -13,6 +13,7 @@ import com.passus.st.client.http.filter.HttpFilterChain; import com.passus.st.emitter.SessionInfo; import com.passus.st.source.EventDestination; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import org.apache.logging.log4j.LogManager; @@ -82,7 +83,7 @@ if (e.getStatus() == SessionStatusEvent.STATUS_CLOSED) { deregisterFlow(e.getSessionInfo()); } - + break; } case HttpSessionPayloadEvent.TYPE: { @@ -114,6 +115,13 @@ filterChain.addFilter(filter); } + @Override + public void setFilters(Collection<HttpFilter> filters) { + Assert.notContainsNull(filters, "filters"); + filterChain.clear(); + filters.forEach((filter) -> filterChain.addFilter(filter)); + } + public void join() { }
--- a/stress-tester/src/main/java/com/passus/st/client/Client.java Wed Jan 17 12:14:07 2018 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/Client.java Wed Jan 17 14:47:57 2018 +0100 @@ -1,12 +1,13 @@ package com.passus.st.client; import com.passus.commons.service.Service; +import com.passus.config.Configurable; import com.passus.st.metric.MetricSource; /** * * @author Mirosław Hawrot */ -public interface Client extends EventHandler, MetricSource, Service { +public interface Client extends EventHandler, MetricSource, Service, Configurable { }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/ClientFactory.java Wed Jan 17 14:47:57 2018 +0100 @@ -0,0 +1,25 @@ +package com.passus.st.client; + +import com.passus.commons.plugin.PluginFactory; +import com.passus.st.plugin.PluginConstants; + +/** + * + * @author Mirosław Hawrot + */ +public class ClientFactory extends PluginFactory<Client> { + + private static ClientFactory instance; + + public ClientFactory() { + super(PluginConstants.CATEGORY_CLIENT, Client.class); + } + + public static synchronized ClientFactory getInstance() { + if (instance == null) { + instance = new ClientFactory(); + } + + return instance; + } +}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java Wed Jan 17 12:14:07 2018 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java Wed Jan 17 14:47:57 2018 +0100 @@ -1,7 +1,12 @@ package com.passus.st.client.http; import com.passus.commons.Assert; +import com.passus.commons.annotations.Plugin; import com.passus.commons.service.ServiceException; +import com.passus.config.Configuration; +import com.passus.config.annotations.NodeDefinitionCreate; +import com.passus.config.schema.NodeDefinition; +import com.passus.config.schema.NodeDefinitionCreator; import com.passus.st.client.Client; import com.passus.st.client.Event; import com.passus.st.client.http.filter.HttpFilter; @@ -9,20 +14,39 @@ import com.passus.st.client.http.filter.HttpFilterChain; import com.passus.st.emitter.Emitter; import com.passus.st.metric.MetricsContainer; +import com.passus.st.plugin.PluginConstants; import java.util.ArrayList; import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef; +import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef; +import static com.passus.config.schema.ConfigurationSchemaBuilder.valueDefInteger; +import static com.passus.config.schema.ConfigurationSchemaBuilder.valueDefBool; +import com.passus.config.validation.LongValidator; +import com.passus.st.client.http.filter.HttpFiltersNodeDefinitionCreator; +import java.util.Collection; +import java.util.Collections; /** * * @author Mirosław Hawrot */ +@NodeDefinitionCreate(HttpClient.HttpClientNodeDefCreator.class) +@Plugin(name = HttpClient.TYPE, category = PluginConstants.CATEGORY_CLIENT) public class HttpClient implements Client, HttpFilterAware { + public static final String TYPE = "http"; + + private static final int DEFAULT_WORKERS_NUM = 1; + + private static final boolean DEFAULT_CONNECT_PARTIAL_SESSION = false; + + private static final boolean DEFAULT_COLLECT_METRICS = false; + private static final Logger LOGGER = LogManager.getLogger(HttpClient.class); - private final Emitter emitter; + private Emitter emitter; private HttpClientWorker[] workers; @@ -38,19 +62,30 @@ private String wokerType = "synch"; - private int workersNum = 1; + private int workersNum = DEFAULT_WORKERS_NUM; - private boolean collectMetrics = false; + private boolean collectMetrics = DEFAULT_COLLECT_METRICS; - private boolean connectPartialSession = false; + private boolean connectPartialSession = DEFAULT_CONNECT_PARTIAL_SESSION; private HttpClientWorkerDispatcher dispatcher; + public HttpClient() { + } + public HttpClient(Emitter emitter) { Assert.notNull(emitter, "emitter"); this.emitter = emitter; } + public Emitter getEmitter() { + return emitter; + } + + public void setEmitter(Emitter emitter) { + this.emitter = emitter; + } + public void addListener(HttpClientListener listener) { synchronized (listeners) { listeners.add(listener); @@ -63,6 +98,13 @@ filterChain.addFilter(filter); } + @Override + public void setFilters(Collection<HttpFilter> filters) { + Assert.notContainsNull(filters, "filters"); + filterChain.clear(); + filters.forEach((filter) -> filterChain.addFilter(filter)); + } + public String getWokerType() { return wokerType; } @@ -148,6 +190,14 @@ } @Override + public void configure(Configuration config) { + setFilters((List<HttpFilter>) config.get("filters", Collections.EMPTY_LIST)); + setWorkersNum(config.getInteger("workers", DEFAULT_WORKERS_NUM)); + connectPartialSession = config.getBoolean("connectPartialSession", DEFAULT_CONNECT_PARTIAL_SESSION); + collectMetrics = config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS); + } + + @Override public boolean isStarted() { return started; } @@ -176,6 +226,10 @@ return; } + if (emitter == null) { + throw new ServiceException("Emitter not set."); + } + HttpClientWorkerFactory factory = HttpClientWorkerFactory.getInstance(); String threadName = getClass().getSimpleName() + "-worker-"; workers = new HttpClientWorker[workersNum]; @@ -257,4 +311,20 @@ } + public static class HttpClientNodeDefCreator implements NodeDefinitionCreator { + + @Override + public NodeDefinition create() { + return mapDef( + tupleDef("connectPartialSession", valueDefBool()).setRequired(false), + tupleDef("collectMetrics", valueDefBool()).setRequired(false), + tupleDef("workers", valueDefInteger() + .addValueValidator(LongValidator.GREATER_ZERO) + ).setRequired(false), + tupleDef("filters", HttpFiltersNodeDefinitionCreator.createFiltersList()).setRequired(false) + ); + } + + } + }
--- a/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpFilterAware.java Wed Jan 17 12:14:07 2018 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpFilterAware.java Wed Jan 17 14:47:57 2018 +0100 @@ -1,10 +1,15 @@ package com.passus.st.client.http.filter; +import java.util.Collection; + /** * * @author mikolaj.podbielski */ public interface HttpFilterAware { + public void setFilters(Collection<HttpFilter> filters); + public void addFilter(HttpFilter filter); + }
--- a/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpFiltersNodeDefinitionCreator.java Wed Jan 17 12:14:07 2018 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpFiltersNodeDefinitionCreator.java Wed Jan 17 14:47:57 2018 +0100 @@ -15,7 +15,7 @@ */ public class HttpFiltersNodeDefinitionCreator implements NodeDefinitionCreator { - public NodeDefinition createFiltersList() { + public static NodeDefinition createFiltersList() { KeyValueVaryListNodeDefinition listDef = keyValueVaryListDef("type"); HttpFilterFactory factory = HttpFilterFactory.getInstance(); Collection<PluginInfo> plugins = factory.getAll();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/config/ClientConfigurator.java Wed Jan 17 14:47:57 2018 +0100 @@ -0,0 +1,85 @@ +package com.passus.st.config; + +import com.passus.commons.Assert; +import com.passus.commons.annotations.Plugin; +import com.passus.commons.plugin.PluginFactory; +import com.passus.config.CListNode; +import com.passus.config.CNode; +import com.passus.config.Configuration; +import com.passus.config.ConfigurationContext; +import com.passus.config.ConfigurationUtils; +import static com.passus.config.ConfigurationUtils.validateType; +import com.passus.config.DomainConfigurator; +import com.passus.config.NodeType; +import com.passus.config.schema.DynaKeyValueVaryListNodeDefinition; +import com.passus.config.schema.NodeDefinition; +import com.passus.config.validation.Errors; +import com.passus.data.type.Type; +import com.passus.st.client.Client; +import com.passus.st.client.ClientFactory; +import com.passus.st.plugin.PluginConstants; +import java.util.List; + +/** + * + * @author Mirosław Hawrot + */ +@Plugin(name = ClientConfigurator.DOMAIN, category = PluginConstants.CATEGORY_DOMAIN_CONFIGURATOR) +public class ClientConfigurator implements DomainConfigurator { + + public static final String DOMAIN = "client"; + + private PluginFactory<Client> clientFactory = ClientFactory.getInstance(); + + private NodeDefinition nodeDef = createNodeDef(clientFactory); + + @Override + public String getDomain() { + return DOMAIN; + } + + public PluginFactory<Client> getClientFactory() { + return clientFactory; + } + + public void setClientFactory(PluginFactory<Client> clientFactory) { + Assert.notNull(clientFactory, "clientFactory"); + this.clientFactory = clientFactory; + nodeDef = createNodeDef(clientFactory); + } + + @Override + public void validate(Configuration config, Errors errors, ConfigurationContext context) { + CNode rootNode = config.getRootNode(); + if (!validateType(rootNode, NodeType.LIST, errors)) { + return; + } + + nodeDef.validate(rootNode, errors, context); + } + + @Override + public void configure(Configuration config, Errors errors, ConfigurationContext context) { + CNode rootNode = config.getRootNode(); + if (!validateType(rootNode, NodeType.LIST, errors)) { + return; + } + + CListNode listNode = (CListNode) nodeDef.transform(rootNode, errors, context); + if (listNode != null) { + try { + List clients = ConfigurationUtils.convertToList(listNode, Type.OBJECT); + context.add("clients", clients); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + } + + private static DynaKeyValueVaryListNodeDefinition createNodeDef(PluginFactory<Client> clientFactory) { + DynaKeyValueVaryListNodeDefinition nodeDef = new DynaKeyValueVaryListNodeDefinition("type", clientFactory); + nodeDef.setTransformToPluginObject(true); + return nodeDef; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/config/TestJobConfigurator.java Wed Jan 17 14:47:57 2018 +0100 @@ -0,0 +1,124 @@ +package com.passus.st.config; + +import com.passus.commons.Assert; +import com.passus.commons.plugin.PluginFactory; +import com.passus.config.CMapNode; +import com.passus.config.CNode; +import com.passus.config.CTupleNode; +import com.passus.config.Configuration; +import com.passus.config.ConfigurationContext; +import static com.passus.config.ConfigurationUtils.validateType; +import com.passus.config.Configurator; +import com.passus.config.DomainConfigurator; +import com.passus.config.NodeType; +import com.passus.config.validation.Errors; +import com.passus.st.job.TestJob; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * + * @author Mirosław Hawrot + */ +public class TestJobConfigurator implements Configurator { + + private static final Logger LOGGER = LogManager.getLogger(TestJobConfigurator.class); + + private PluginFactory<DomainConfigurator> configuratorFactory = DomainConfiguratorFactory.getInstance(); + + private TestJob job; + + public TestJobConfigurator() { + this.job = new TestJob(); + } + + public TestJobConfigurator(TestJob job) { + Assert.notNull(job, "job"); + this.job = job; + } + + public PluginFactory<DomainConfigurator> getConfiguratorFactory() { + return configuratorFactory; + } + + public TestJob getJob() { + return job; + } + + public void setJob(TestJob job) { + Assert.notNull(job, "job"); + this.job = job; + } + + public void setConfiguratorFactory(PluginFactory<DomainConfigurator> configuratorFactory) { + Assert.notNull(configuratorFactory, "configuratorFactory"); + this.configuratorFactory = configuratorFactory; + } + + private Map<String, DomainConfigurator> getDomainsConfigurator(Configuration config, Errors errors) { + return getDomainsConfigurator(config.getRootNode(), errors); + } + + private Map<String, DomainConfigurator> getDomainsConfigurator(CNode rootNode, Errors errors) { + if (!validateType(rootNode, NodeType.MAP, errors)) { + return null; + } + + CMapNode mapNode = (CMapNode) rootNode; + Map<String, DomainConfigurator> domainsConfigurator = new LinkedHashMap<>(); + List<CTupleNode> tuples = mapNode.getChildren(); + for (CTupleNode tuple : tuples) { + String domain = tuple.getName(); + DomainConfigurator configurator = null; + + try { + configurator = configuratorFactory.getInstanceByName(domain); + } catch (IllegalArgumentException e) { + } + + if (configurator == null) { + errors.reject(tuple, "Unknwon property '%s'.", domain); + break; + } + + domainsConfigurator.put(domain, configurator); + } + + return domainsConfigurator; + } + + @Override + public void validate(Configuration config, Errors errors, ConfigurationContext context) { + Map<String, DomainConfigurator> configurators = getDomainsConfigurator(config, errors); + if (!errors.hasError()) { + for (Entry<String, DomainConfigurator> entry : configurators.entrySet()) { + Configuration subConfig = config.subConfiguration(entry.getKey()); + DomainConfigurator configurator = entry.getValue(); + configurator.validate(subConfig, errors, context); + if (errors.hasError()) { + break; + } + } + } + } + + @Override + public void configure(Configuration config, Errors errors, ConfigurationContext context) { + Map<String, DomainConfigurator> configurators = getDomainsConfigurator(config, errors); + if (!errors.hasError()) { + for (Entry<String, DomainConfigurator> entry : configurators.entrySet()) { + Configuration subConfig = config.subConfiguration(entry.getKey()); + DomainConfigurator configurator = entry.getValue(); + configurator.configure(subConfig, errors, context); + if (errors.hasError()) { + break; + } + } + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/job/Job.java Wed Jan 17 14:47:57 2018 +0100 @@ -0,0 +1,13 @@ +package com.passus.st.job; + +/** + * + * @author Mirosław Hawrot + */ +public interface Job { + + public void start(); + + public void stop(); + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/job/JobContext.java Wed Jan 17 14:47:57 2018 +0100 @@ -0,0 +1,46 @@ +package com.passus.st.job; + +/** + * + * @author Mirosław Hawrot + */ +public class JobContext { + + private final String jobId; + + private final String projectId; + + private long startTime; + + private JobStatus status = JobStatus.NEW; + + public JobContext(String jobId, String projectId) { + this.jobId = jobId; + this.projectId = projectId; + } + + public String getJobId() { + return jobId; + } + + public String getProjectId() { + return projectId; + } + + public long getStartTime() { + return startTime; + } + + void setStartTime(long startTime) { + this.startTime = startTime; + } + + public JobStatus getStatus() { + return status; + } + + void setStatus(JobStatus status) { + this.status = status; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/job/JobExecutor.java Wed Jan 17 14:47:57 2018 +0100 @@ -0,0 +1,456 @@ +package com.passus.st.job; + +import com.passus.commons.Assert; +import com.passus.commons.metric.MetricsCollection; +import com.passus.commons.service.Registry; +import com.passus.commons.utils.StringUtils; +import com.passus.st.project.Project; +import com.passus.st.client.MemoryEventsCache; +import com.passus.st.client.http.HttpClient; +import com.passus.st.client.http.HttpSourceNameAwareClientWorkerDispatcher; +import com.passus.st.client.http.ReporterDestination; +import com.passus.st.client.http.ReporterFileDestination; +import com.passus.st.emitter.PassThroughSessionMapper; +import com.passus.st.emitter.RuleBasedSessionMapper; +import com.passus.st.emitter.SessionMapper; +import com.passus.st.emitter.nio.NioEmitter; +import com.passus.st.metric.ScheduledMetricsCollector; +import com.passus.st.reporter.snmp.SnmpLogger; +import com.passus.st.source.PcapSessionEventSource; +import java.io.File; +import java.text.ParseException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * + * @author Mirosław Hawrot + */ +public class JobExecutor { + + private static final Logger LOGGER = LogManager.getLogger(JobExecutor.class); + + private final Map<String, JobThread> jobThreads = new HashMap<>(); + + private final List<JobListener> listeners = new LinkedList<>(); + + public boolean stop(Project project) { + return stop(project, -1); + } + + public boolean stop(Project project, long waitTime) { + synchronized (this) { + JobContext context = getContext(project); + if (context == null || context.getStatus() == JobStatus.STOPPED) { + return false; + } + + JobThread jobThread = jobThreads.get(context.getJobId()); + if (jobThread == null) { + return false; + } + + jobThread.terminate(); + try { + jobThread.interrupt(); + + if (waitTime > 0) { + jobThread.join(waitTime); + } else { + jobThread.join(); + } + } catch (Exception ignore) { + } + + return true; + } + } + + public boolean hasJob(Project project) { + Assert.notNull(project, "project"); + synchronized (this) { + return jobThreads.values().stream() + .anyMatch((t) -> t.context.getProjectId().equals(project.getId())); + } + } + + public boolean removeJob(Project project) { + Assert.notNull(project, "project"); + synchronized (this) { + JobContext context = getContext(project); + if (context != null) { + if (context.getStatus() != JobStatus.STOPPED) { + throw new IllegalArgumentException("Unable to remove project '" + project.getName() + "'. " + + "There is associated job in status != STOPPED."); + } + + jobThreads.remove(context.getJobId()); + fireJobRemoved(context); + return true; + } + } + + return false; + } + + public JobContext getContext(Project project) { + Assert.notNull(project, "project"); + synchronized (this) { + for (JobThread t : jobThreads.values()) { + if (t.context.getProjectId().equals(project.getId())) { + return t.context; + } + } + + return null; + } + } + + public void addListener(JobListener listener) { + Assert.notNull(listener, "listener"); + synchronized (this) { + listeners.add(listener); + } + } + + public boolean removeListener(JobListener listener) { + Assert.notNull(listener, "listener"); + synchronized (this) { + return listeners.remove(listener); + } + } + + private void fireJobRemoved(JobContext context) { + synchronized (this) { + listeners.forEach((listener) -> { + listener.onJobRemoved(context); + }); + } + } + + private void fireJobStatusChanged(JobContext context, JobStatus oldStatus) { + synchronized (this) { + listeners.forEach((listener) -> { + listener.onJobStatusChanged(context, oldStatus); + }); + } + } + + private void fireJobMetrics(JobContext context, MetricsCollection metrics) { + synchronized (this) { + listeners.forEach((listener) -> { + listener.onJobMetrics(context, metrics); + }); + } + } + + private void fireJobError(JobContext context, Throwable cause) { + synchronized (this) { + listeners.forEach((listener) -> { + listener.onJobError(context, cause); + }); + } + } + + public String add(Project project) { + synchronized (this) { + if (hasJob(project)) { + throw new IllegalArgumentException("Job for project '" + project.getName() + "' already exists."); + } + + String jobId = StringUtils.randomString(); + JobContext context = new JobContext(jobId, project.getId()); + JobThread jobThread = new JobThread(context); + jobThread.start(); + jobThreads.put(jobId, jobThread); + return jobId; + } + } + + private class JobThread extends Thread { + + private final JobContext context; + + private Set<PcapSessionEventSource> eventSrcs; + + private HttpClient httpClient; + + private NioEmitter emitter; + + private MemoryEventsCache cache; + + private ScheduledMetricsCollector metricsCollector; + + private SnmpLogger snmpLogger; + + private JobThread(JobContext context) { + super("Job-" + context.getJobId()); + this.context = context; + context.setStatus(JobStatus.WAITING); + } + + private void configureFilters(Project project, HttpClient client) { + /*List<FilterSettings> filtersSettings = project.getHttpFiltersSettings(); + if (filtersSettings.isEmpty()) { + return; + } + + HttpFilterFactory factory = new HttpFilterFactory(); + filtersSettings.forEach((filterSettings) -> { + if (filterSettings.isActive()) { + try { + HttpFilter filter = factory.getInstanceByName(filterSettings.getType()); + filter.configure(filterSettings.getConfig()); + client.addFilter(filter); + } catch (Exception e) { + LOGGER.warn(e.getMessage(), e); + } + } + });*/ + } + + private void configure() { + /*metricsCollector = new ScheduledMetricsCollector(); + metricsCollector.setPeriodTime(2_000); + + //TODO Na razie pobieramy caly projekt, docelowo powinny byc tworzona kopia parametrow + Project project = STApp.PROJECT_MANAGER.load(context.getProjectId()); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Configuring job: {} project id: {}, name: {}", + context.getJobId(), project.getId(), project.getName()); + } + + SessionMapper mapper; + String mapperRule = project.getMapperRule(); + if (mapperRule == null) { + mapper = new PassThroughSessionMapper(); + } else { + // TODO: refactor + RuleBasedSessionMapper rbsm = new RuleBasedSessionMapper(); + try { + rbsm.addRule(mapperRule); + mapper = rbsm; + } catch (ParseException ex) { + mapper = new PassThroughSessionMapper(); + } + } + + Set<String> pcapFiles = project.getPcapFiles(); + if (pcapFiles == null || pcapFiles.isEmpty()) { + throw new IllegalArgumentException("At least one pcap file required."); + } + + emitter = new NioEmitter(); + emitter.setSessionMapper(mapper); + emitter.setCollectMetrics(true); + + httpClient = new HttpClient(emitter); + httpClient.setCollectMetrics(true); + httpClient.setConnectPartialSession(project.isAllowPartialSessions()); + httpClient.setWokerType(project.getWorkerType().toString()); + configureFilters(project, httpClient); + + if (project.getParallelReplays() != null) { + if (pcapFiles.size() != 1) { + throw new IllegalArgumentException("Parameter \"parallelReplays\" works only for one pcap file."); + } + + httpClient.setWorkersNum(project.getParallelReplays()); + } else { + httpClient.setWorkersNum(pcapFiles.size()); + httpClient.setDispatcher(new HttpSourceNameAwareClientWorkerDispatcher()); + } + + if (project.getReplaySpeed() > 0) { + float speed = project.getReplaySpeed(); + if (speed > .1f && speed < 100f) { + httpClient.setSleepFactor(1f / speed); + } + } + eventSrcs = new HashSet<>(); + + for (String pcapFile : pcapFiles) { + PcapSessionEventSource eventSrc = new PcapSessionEventSource(); + eventSrc.setPcapFile(pcapFile); + eventSrc.setAllowPartialSession(project.isAllowPartialSessions()); + eventSrc.setCollectMetrics(true); + eventSrc.getPortsRange().clear(); + eventSrc.getPortsRange().addAll(project.getHttpPortsRange()); + eventSrcs.add(eventSrc); + } + + if (project.isEnableCache()) { + Set<String> sourcesName = new HashSet<>(); + eventSrcs.forEach((eventSrc) -> { + sourcesName.add(eventSrc.getName()); + }); + + cache = new MemoryEventsCache(httpClient, sourcesName); + eventSrcs.forEach((eventSrc) -> { + eventSrc.setHandler(cache); + }); + + cache.setLoop(project.getLoops()); + + } else { + eventSrcs.forEach((eventSrc) -> { + eventSrc.setHandler(httpClient); + eventSrc.setLoops(project.getLoops()); + }); + } + + eventSrcs.forEach((eventSrc) -> { + metricsCollector.register(eventSrc); + }); + + metricsCollector.register(emitter); + metricsCollector.register(httpClient); + metricsCollector.addHandler((mc) -> { + fireJobMetrics(context, mc); + }); + + try { + File projectDir = STApp.PROJECT_MANAGER.getProjectDir(project); + File projectDataDir = new File(projectDir, "Data"); + + ReporterFileDestination reporterDestination = new ReporterFileDestination(projectDataDir); + Registry.getInstance().add(ReporterDestination.SERVICE_NAME, reporterDestination); + httpClient.addListener(reporterDestination); + metricsCollector.addHandler(reporterDestination); + + if (project.isSnmpEnabled() && project.getSnmpHost() != null) { + String quasiUrl = SnmpLogger.createQuasiUrl(project.getSnmpHost(), project.getSnmpPort()); + String communityString = project.getSnmpCommunity(); + System.out.println("GOING to start SNMP client " + quasiUrl + " " + communityString); + if (communityString == null) { + // TODO: usunac + communityString = "passus"; + } + snmpLogger = new SnmpLogger(quasiUrl, communityString, project.getSnmpPeriod(), projectDataDir); + LOGGER.debug("SNMP logger started."); + } + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + }*/ + } + + private void changeStatus(JobStatus status) { + if (status == JobStatus.RUNNING) { + context.setStartTime(System.currentTimeMillis()); + } + + JobStatus oldStatus = context.getStatus(); + context.setStatus(status); + fireJobStatusChanged(context, oldStatus); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Job {} status changed {} -> {}.", context.getJobId(), oldStatus, status); + } + } + + private void terminate() { + if (context.getStatus() == JobStatus.STOPPED + || context.getStatus() == JobStatus.STOPPING) { + return; + } + + changeStatus(JobStatus.STOPPING); + + metricsCollector.collect(); + + for (PcapSessionEventSource eventSrc : eventSrcs) { + try { + eventSrc.stop(); + } catch (Exception e) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(e.getMessage(), e); + } + } + } + + try { + httpClient.stop(); + } catch (Exception e) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(e.getMessage(), e); + } + } + + try { + emitter.stop(); + } catch (Exception e) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(e.getMessage(), e); + } + } + + if (snmpLogger != null) { + snmpLogger.close(); + LOGGER.debug("SNMP logger stopped."); + } + + metricsCollector.flush(true); + metricsCollector.stop(); + changeStatus(JobStatus.STOPPED); + } + + private void exceptionCaught(Throwable cause) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(cause); + } + + fireJobError(context, cause); + terminate(); + } + + @Override + public void run() { + changeStatus(JobStatus.STARTING); + try { + configure(); + } catch (Exception e) { + exceptionCaught(e); + return; + } + + try { + metricsCollector.start(); + emitter.start(); + httpClient.start(); + eventSrcs.forEach((eventSrc) -> { + eventSrc.start(); + }); + + changeStatus(JobStatus.RUNNING); + + if (cache != null) { + try { + cache.await(); + } catch (InterruptedException ignore) { + } + + cache.send(); + } + } catch (Exception e) { + exceptionCaught(e); + return; + } + + try { + httpClient.join(); + } catch (Exception ignore) { + } + + terminate(); + } + + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/job/JobListener.java Wed Jan 17 14:47:57 2018 +0100 @@ -0,0 +1,26 @@ +package com.passus.st.job; + +import com.passus.commons.metric.MetricsCollection; + +/** + * + * @author Mirosław Hawrot + */ +public interface JobListener { + + public default void onJobRemoved(JobContext context) { + + } + + public default void onJobStatusChanged(JobContext context, JobStatus oldStatus) { + + } + + public default void onJobError(JobContext context, Throwable cause) { + + } + + public default void onJobMetrics(JobContext context, MetricsCollection collection) { + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/job/JobStatus.java Wed Jan 17 14:47:57 2018 +0100 @@ -0,0 +1,21 @@ +package com.passus.st.job; + +/** + * + * @author Mirosław Hawrot + */ +public enum JobStatus { + + NEW(0), WAITING(1), STARTING(2), RUNNING(3), SUSPENDED(4), ERROR(5), STOPPING(6), STOPPED(7); + + private final int priority; + + private JobStatus(int priority) { + this.priority = priority; + } + + public int priority() { + return priority; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/job/TestJob.java Wed Jan 17 14:47:57 2018 +0100 @@ -0,0 +1,19 @@ +package com.passus.st.job; + +/** + * + * @author Mirosław Hawrot + */ +public class TestJob implements Job { + + @Override + public void start() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void stop() { + throw new UnsupportedOperationException("Not supported yet."); + } + +}
--- a/stress-tester/src/main/java/com/passus/st/plugin/PluginConstants.java Wed Jan 17 12:14:07 2018 +0100 +++ b/stress-tester/src/main/java/com/passus/st/plugin/PluginConstants.java Wed Jan 17 14:47:57 2018 +0100 @@ -15,9 +15,11 @@ public static final String CATEGORY_EVENT_SOURCE = "EventSource"; public static final String CATEGORY_METRICS_COLLECTION_APPENDER = "MetricsCollectionAppender"; - + public static final String CATEGORY_SESSION_MAPPER = "SessionMapper"; - + + public static final String CATEGORY_CLIENT = "Client"; + public static final String CATEGORY_HTTP_CLIENT_WORKER = "ClientWorker"; private PluginConstants() {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/project/Project.java Wed Jan 17 14:47:57 2018 +0100 @@ -0,0 +1,51 @@ +package com.passus.st.project; + +import com.passus.config.Configuration; + +/** + * + * @author Mirosław Hawrot + */ +public class Project { + + private String id; + + private String name; + + private boolean active; + + private Configuration config; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public boolean isActive() { + return active; + } + + public void setActive(boolean active) { + this.active = active; + } + + public Configuration getConfig() { + return config; + } + + public void setConfig(Configuration config) { + this.config = config; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/config/ClientConfiguratorTest.java Wed Jan 17 14:47:57 2018 +0100 @@ -0,0 +1,64 @@ +package com.passus.st.config; + +import com.passus.commons.plugin.PluginFactory; +import com.passus.config.Configuration; +import com.passus.config.ConfigurationContext; +import com.passus.config.ConfigurationContextImpl; +import com.passus.config.YamlConfigurationReader; +import com.passus.config.validation.Errors; +import com.passus.st.Log4jConfigurationFactory; +import com.passus.st.client.Client; +import com.passus.st.client.http.HttpClient; +import java.io.IOException; +import java.util.List; +import static org.testng.Assert.*; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * + * @author Mirosław Hawrot + */ +public class ClientConfiguratorTest { + + @Test + public void testConfigure() throws Exception { + Log4jConfigurationFactory.enableFactory("debug"); + String configStr = "client:\n" + + " - type: http\n" + + " connectPartialSession: true\n" + + " collectMetrics: true\n" + + " workers: 4\n"; + /*+ " filters:\n" + + " - type: formAuth\n" + + " active: true\n" + + " loginCheckUrl: /login\n" + + " userField: username\n" + + " passwordField: password\n" + + " applyIf:\n" + + " \"req.url\": {$contains: \"/login\"}\n" + + " provider:\n" + + " UsernamePassword:\n" + + " username: user2\n" + + " password: 'Qwe!23'";*/ + + Configuration config = YamlConfigurationReader.readFromString(configStr); + ClientConfigurator configurator = new ClientConfigurator(); + + Errors errors = new Errors(); + ConfigurationContext context = new ConfigurationContextImpl(); + configurator.configure(config.subConfiguration("client"), errors, context); + assertFalse(errors.hasError()); + + List<Client> clients = (List<Client>) context.get("clients"); + assertTrue(clients.get(0) instanceof HttpClient); + HttpClient httpClient = (HttpClient) clients.get(0); + assertTrue(httpClient.isCollectMetrics()); + assertTrue(httpClient.isConnectPartialSession()); + assertEquals(4, httpClient.getWorkersNum()); + } + +}