changeset 818:8abe3a914cc6

Global configuration in progress
author Devel 2
date Wed, 17 Jan 2018 14:47:57 +0100
parents c5f42c314230
children 53537abab89b
files stress-tester/src/main/java/com/passus/st/ConverterHttpClient.java stress-tester/src/main/java/com/passus/st/client/Client.java stress-tester/src/main/java/com/passus/st/client/ClientFactory.java stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java stress-tester/src/main/java/com/passus/st/client/http/filter/HttpFilterAware.java stress-tester/src/main/java/com/passus/st/client/http/filter/HttpFiltersNodeDefinitionCreator.java stress-tester/src/main/java/com/passus/st/config/ClientConfigurator.java stress-tester/src/main/java/com/passus/st/config/TestJobConfigurator.java stress-tester/src/main/java/com/passus/st/job/Job.java stress-tester/src/main/java/com/passus/st/job/JobContext.java stress-tester/src/main/java/com/passus/st/job/JobExecutor.java stress-tester/src/main/java/com/passus/st/job/JobListener.java stress-tester/src/main/java/com/passus/st/job/JobStatus.java stress-tester/src/main/java/com/passus/st/job/TestJob.java stress-tester/src/main/java/com/passus/st/plugin/PluginConstants.java stress-tester/src/main/java/com/passus/st/project/Project.java stress-tester/src/test/java/com/passus/st/config/ClientConfiguratorTest.java
diffstat 17 files changed, 1025 insertions(+), 9 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/ConverterHttpClient.java	Wed Jan 17 12:14:07 2018 +0100
+++ b/stress-tester/src/main/java/com/passus/st/ConverterHttpClient.java	Wed Jan 17 14:47:57 2018 +0100
@@ -13,6 +13,7 @@
 import com.passus.st.client.http.filter.HttpFilterChain;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.source.EventDestination;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.logging.log4j.LogManager;
@@ -82,7 +83,7 @@
                     if (e.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
                         deregisterFlow(e.getSessionInfo());
                     }
-                    
+
                     break;
                 }
                 case HttpSessionPayloadEvent.TYPE: {
@@ -114,6 +115,13 @@
         filterChain.addFilter(filter);
     }
 
+    @Override
+    public void setFilters(Collection<HttpFilter> filters) {
+        Assert.notContainsNull(filters, "filters");
+        filterChain.clear();
+        filters.forEach((filter) -> filterChain.addFilter(filter));
+    }
+
     public void join() {
     }
 
--- a/stress-tester/src/main/java/com/passus/st/client/Client.java	Wed Jan 17 12:14:07 2018 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/Client.java	Wed Jan 17 14:47:57 2018 +0100
@@ -1,12 +1,13 @@
 package com.passus.st.client;
 
 import com.passus.commons.service.Service;
+import com.passus.config.Configurable;
 import com.passus.st.metric.MetricSource;
 
 /**
  *
  * @author Mirosław Hawrot
  */
-public interface Client extends EventHandler, MetricSource, Service {
+public interface Client extends EventHandler, MetricSource, Service, Configurable {
     
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/ClientFactory.java	Wed Jan 17 14:47:57 2018 +0100
@@ -0,0 +1,25 @@
+package com.passus.st.client;
+
+import com.passus.commons.plugin.PluginFactory;
+import com.passus.st.plugin.PluginConstants;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class ClientFactory extends PluginFactory<Client> {
+
+    private static ClientFactory instance;
+
+    public ClientFactory() {
+        super(PluginConstants.CATEGORY_CLIENT, Client.class);
+    }
+
+    public static synchronized ClientFactory getInstance() {
+        if (instance == null) {
+            instance = new ClientFactory();
+        }
+
+        return instance;
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java	Wed Jan 17 12:14:07 2018 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java	Wed Jan 17 14:47:57 2018 +0100
@@ -1,7 +1,12 @@
 package com.passus.st.client.http;
 
 import com.passus.commons.Assert;
+import com.passus.commons.annotations.Plugin;
 import com.passus.commons.service.ServiceException;
+import com.passus.config.Configuration;
+import com.passus.config.annotations.NodeDefinitionCreate;
+import com.passus.config.schema.NodeDefinition;
+import com.passus.config.schema.NodeDefinitionCreator;
 import com.passus.st.client.Client;
 import com.passus.st.client.Event;
 import com.passus.st.client.http.filter.HttpFilter;
@@ -9,20 +14,39 @@
 import com.passus.st.client.http.filter.HttpFilterChain;
 import com.passus.st.emitter.Emitter;
 import com.passus.st.metric.MetricsContainer;
+import com.passus.st.plugin.PluginConstants;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef;
+import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef;
+import static com.passus.config.schema.ConfigurationSchemaBuilder.valueDefInteger;
+import static com.passus.config.schema.ConfigurationSchemaBuilder.valueDefBool;
+import com.passus.config.validation.LongValidator;
+import com.passus.st.client.http.filter.HttpFiltersNodeDefinitionCreator;
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  *
  * @author Mirosław Hawrot
  */
+@NodeDefinitionCreate(HttpClient.HttpClientNodeDefCreator.class)
+@Plugin(name = HttpClient.TYPE, category = PluginConstants.CATEGORY_CLIENT)
 public class HttpClient implements Client, HttpFilterAware {
 
+    public static final String TYPE = "http";
+
+    private static final int DEFAULT_WORKERS_NUM = 1;
+
+    private static final boolean DEFAULT_CONNECT_PARTIAL_SESSION = false;
+
+    private static final boolean DEFAULT_COLLECT_METRICS = false;
+
     private static final Logger LOGGER = LogManager.getLogger(HttpClient.class);
 
-    private final Emitter emitter;
+    private Emitter emitter;
 
     private HttpClientWorker[] workers;
 
@@ -38,19 +62,30 @@
 
     private String wokerType = "synch";
 
-    private int workersNum = 1;
+    private int workersNum = DEFAULT_WORKERS_NUM;
 
-    private boolean collectMetrics = false;
+    private boolean collectMetrics = DEFAULT_COLLECT_METRICS;
 
-    private boolean connectPartialSession = false;
+    private boolean connectPartialSession = DEFAULT_CONNECT_PARTIAL_SESSION;
 
     private HttpClientWorkerDispatcher dispatcher;
 
+    public HttpClient() {
+    }
+
     public HttpClient(Emitter emitter) {
         Assert.notNull(emitter, "emitter");
         this.emitter = emitter;
     }
 
+    public Emitter getEmitter() {
+        return emitter;
+    }
+
+    public void setEmitter(Emitter emitter) {
+        this.emitter = emitter;
+    }
+
     public void addListener(HttpClientListener listener) {
         synchronized (listeners) {
             listeners.add(listener);
@@ -63,6 +98,13 @@
         filterChain.addFilter(filter);
     }
 
+    @Override
+    public void setFilters(Collection<HttpFilter> filters) {
+        Assert.notContainsNull(filters, "filters");
+        filterChain.clear();
+        filters.forEach((filter) -> filterChain.addFilter(filter));
+    }
+
     public String getWokerType() {
         return wokerType;
     }
@@ -148,6 +190,14 @@
     }
 
     @Override
+    public void configure(Configuration config) {
+        setFilters((List<HttpFilter>) config.get("filters", Collections.EMPTY_LIST));
+        setWorkersNum(config.getInteger("workers", DEFAULT_WORKERS_NUM));
+        connectPartialSession = config.getBoolean("connectPartialSession", DEFAULT_CONNECT_PARTIAL_SESSION);
+        collectMetrics = config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS);
+    }
+
+    @Override
     public boolean isStarted() {
         return started;
     }
@@ -176,6 +226,10 @@
             return;
         }
 
+        if (emitter == null) {
+            throw new ServiceException("Emitter not set.");
+        }
+
         HttpClientWorkerFactory factory = HttpClientWorkerFactory.getInstance();
         String threadName = getClass().getSimpleName() + "-worker-";
         workers = new HttpClientWorker[workersNum];
@@ -257,4 +311,20 @@
 
     }
 
+    public static class HttpClientNodeDefCreator implements NodeDefinitionCreator {
+
+        @Override
+        public NodeDefinition create() {
+            return mapDef(
+                    tupleDef("connectPartialSession", valueDefBool()).setRequired(false),
+                    tupleDef("collectMetrics", valueDefBool()).setRequired(false),
+                    tupleDef("workers", valueDefInteger()
+                            .addValueValidator(LongValidator.GREATER_ZERO)
+                    ).setRequired(false),
+                    tupleDef("filters", HttpFiltersNodeDefinitionCreator.createFiltersList()).setRequired(false)
+            );
+        }
+
+    }
+
 }
--- a/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpFilterAware.java	Wed Jan 17 12:14:07 2018 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpFilterAware.java	Wed Jan 17 14:47:57 2018 +0100
@@ -1,10 +1,15 @@
 package com.passus.st.client.http.filter;
 
+import java.util.Collection;
+
 /**
  *
  * @author mikolaj.podbielski
  */
 public interface HttpFilterAware {
 
+    public void setFilters(Collection<HttpFilter> filters);
+
     public void addFilter(HttpFilter filter);
+
 }
--- a/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpFiltersNodeDefinitionCreator.java	Wed Jan 17 12:14:07 2018 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpFiltersNodeDefinitionCreator.java	Wed Jan 17 14:47:57 2018 +0100
@@ -15,7 +15,7 @@
  */
 public class HttpFiltersNodeDefinitionCreator implements NodeDefinitionCreator {
 
-    public NodeDefinition createFiltersList() {
+    public static NodeDefinition createFiltersList() {
         KeyValueVaryListNodeDefinition listDef = keyValueVaryListDef("type");
         HttpFilterFactory factory = HttpFilterFactory.getInstance();
         Collection<PluginInfo> plugins = factory.getAll();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/config/ClientConfigurator.java	Wed Jan 17 14:47:57 2018 +0100
@@ -0,0 +1,85 @@
+package com.passus.st.config;
+
+import com.passus.commons.Assert;
+import com.passus.commons.annotations.Plugin;
+import com.passus.commons.plugin.PluginFactory;
+import com.passus.config.CListNode;
+import com.passus.config.CNode;
+import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
+import com.passus.config.ConfigurationUtils;
+import static com.passus.config.ConfigurationUtils.validateType;
+import com.passus.config.DomainConfigurator;
+import com.passus.config.NodeType;
+import com.passus.config.schema.DynaKeyValueVaryListNodeDefinition;
+import com.passus.config.schema.NodeDefinition;
+import com.passus.config.validation.Errors;
+import com.passus.data.type.Type;
+import com.passus.st.client.Client;
+import com.passus.st.client.ClientFactory;
+import com.passus.st.plugin.PluginConstants;
+import java.util.List;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+@Plugin(name = ClientConfigurator.DOMAIN, category = PluginConstants.CATEGORY_DOMAIN_CONFIGURATOR)
+public class ClientConfigurator implements DomainConfigurator {
+
+    public static final String DOMAIN = "client";
+
+    private PluginFactory<Client> clientFactory = ClientFactory.getInstance();
+
+    private NodeDefinition nodeDef = createNodeDef(clientFactory);
+
+    @Override
+    public String getDomain() {
+        return DOMAIN;
+    }
+
+    public PluginFactory<Client> getClientFactory() {
+        return clientFactory;
+    }
+
+    public void setClientFactory(PluginFactory<Client> clientFactory) {
+        Assert.notNull(clientFactory, "clientFactory");
+        this.clientFactory = clientFactory;
+        nodeDef = createNodeDef(clientFactory);
+    }
+
+    @Override
+    public void validate(Configuration config, Errors errors, ConfigurationContext context) {
+        CNode rootNode = config.getRootNode();
+        if (!validateType(rootNode, NodeType.LIST, errors)) {
+            return;
+        }
+
+        nodeDef.validate(rootNode, errors, context);
+    }
+
+    @Override
+    public void configure(Configuration config, Errors errors, ConfigurationContext context) {
+        CNode rootNode = config.getRootNode();
+        if (!validateType(rootNode, NodeType.LIST, errors)) {
+            return;
+        }
+
+        CListNode listNode = (CListNode) nodeDef.transform(rootNode, errors, context);
+        if (listNode != null) {
+            try {
+                List clients = ConfigurationUtils.convertToList(listNode, Type.OBJECT);
+                context.add("clients", clients);
+            } catch (Exception e) {
+                throw new RuntimeException(e.getMessage(), e);
+            }
+        }
+
+    }
+
+    private static DynaKeyValueVaryListNodeDefinition createNodeDef(PluginFactory<Client> clientFactory) {
+        DynaKeyValueVaryListNodeDefinition nodeDef = new DynaKeyValueVaryListNodeDefinition("type", clientFactory);
+        nodeDef.setTransformToPluginObject(true);
+        return nodeDef;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/config/TestJobConfigurator.java	Wed Jan 17 14:47:57 2018 +0100
@@ -0,0 +1,124 @@
+package com.passus.st.config;
+
+import com.passus.commons.Assert;
+import com.passus.commons.plugin.PluginFactory;
+import com.passus.config.CMapNode;
+import com.passus.config.CNode;
+import com.passus.config.CTupleNode;
+import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
+import static com.passus.config.ConfigurationUtils.validateType;
+import com.passus.config.Configurator;
+import com.passus.config.DomainConfigurator;
+import com.passus.config.NodeType;
+import com.passus.config.validation.Errors;
+import com.passus.st.job.TestJob;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class TestJobConfigurator implements Configurator {
+
+    private static final Logger LOGGER = LogManager.getLogger(TestJobConfigurator.class);
+
+    private PluginFactory<DomainConfigurator> configuratorFactory = DomainConfiguratorFactory.getInstance();
+
+    private TestJob job;
+
+    public TestJobConfigurator() {
+        this.job = new TestJob();
+    }
+
+    public TestJobConfigurator(TestJob job) {
+        Assert.notNull(job, "job");
+        this.job = job;
+    }
+
+    public PluginFactory<DomainConfigurator> getConfiguratorFactory() {
+        return configuratorFactory;
+    }
+
+    public TestJob getJob() {
+        return job;
+    }
+
+    public void setJob(TestJob job) {
+        Assert.notNull(job, "job");
+        this.job = job;
+    }
+
+    public void setConfiguratorFactory(PluginFactory<DomainConfigurator> configuratorFactory) {
+        Assert.notNull(configuratorFactory, "configuratorFactory");
+        this.configuratorFactory = configuratorFactory;
+    }
+
+    private Map<String, DomainConfigurator> getDomainsConfigurator(Configuration config, Errors errors) {
+        return getDomainsConfigurator(config.getRootNode(), errors);
+    }
+
+    private Map<String, DomainConfigurator> getDomainsConfigurator(CNode rootNode, Errors errors) {
+        if (!validateType(rootNode, NodeType.MAP, errors)) {
+            return null;
+        }
+
+        CMapNode mapNode = (CMapNode) rootNode;
+        Map<String, DomainConfigurator> domainsConfigurator = new LinkedHashMap<>();
+        List<CTupleNode> tuples = mapNode.getChildren();
+        for (CTupleNode tuple : tuples) {
+            String domain = tuple.getName();
+            DomainConfigurator configurator = null;
+
+            try {
+                configurator = configuratorFactory.getInstanceByName(domain);
+            } catch (IllegalArgumentException e) {
+            }
+
+            if (configurator == null) {
+                errors.reject(tuple, "Unknwon property '%s'.", domain);
+                break;
+            }
+
+            domainsConfigurator.put(domain, configurator);
+        }
+
+        return domainsConfigurator;
+    }
+
+    @Override
+    public void validate(Configuration config, Errors errors, ConfigurationContext context) {
+        Map<String, DomainConfigurator> configurators = getDomainsConfigurator(config, errors);
+        if (!errors.hasError()) {
+            for (Entry<String, DomainConfigurator> entry : configurators.entrySet()) {
+                Configuration subConfig = config.subConfiguration(entry.getKey());
+                DomainConfigurator configurator = entry.getValue();
+                configurator.validate(subConfig, errors, context);
+                if (errors.hasError()) {
+                    break;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void configure(Configuration config, Errors errors, ConfigurationContext context) {
+        Map<String, DomainConfigurator> configurators = getDomainsConfigurator(config, errors);
+        if (!errors.hasError()) {
+            for (Entry<String, DomainConfigurator> entry : configurators.entrySet()) {
+                Configuration subConfig = config.subConfiguration(entry.getKey());
+                DomainConfigurator configurator = entry.getValue();
+                configurator.configure(subConfig, errors, context);
+                if (errors.hasError()) {
+                    break;
+                }
+            }
+        }
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/job/Job.java	Wed Jan 17 14:47:57 2018 +0100
@@ -0,0 +1,13 @@
+package com.passus.st.job;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public interface Job {
+
+    public void start();
+
+    public void stop();
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/job/JobContext.java	Wed Jan 17 14:47:57 2018 +0100
@@ -0,0 +1,46 @@
+package com.passus.st.job;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class JobContext {
+
+    private final String jobId;
+
+    private final String projectId;
+
+    private long startTime;
+
+    private JobStatus status = JobStatus.NEW;
+
+    public JobContext(String jobId, String projectId) {
+        this.jobId = jobId;
+        this.projectId = projectId;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public String getProjectId() {
+        return projectId;
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public JobStatus getStatus() {
+        return status;
+    }
+
+    void setStatus(JobStatus status) {
+        this.status = status;
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/job/JobExecutor.java	Wed Jan 17 14:47:57 2018 +0100
@@ -0,0 +1,456 @@
+package com.passus.st.job;
+
+import com.passus.commons.Assert;
+import com.passus.commons.metric.MetricsCollection;
+import com.passus.commons.service.Registry;
+import com.passus.commons.utils.StringUtils;
+import com.passus.st.project.Project;
+import com.passus.st.client.MemoryEventsCache;
+import com.passus.st.client.http.HttpClient;
+import com.passus.st.client.http.HttpSourceNameAwareClientWorkerDispatcher;
+import com.passus.st.client.http.ReporterDestination;
+import com.passus.st.client.http.ReporterFileDestination;
+import com.passus.st.emitter.PassThroughSessionMapper;
+import com.passus.st.emitter.RuleBasedSessionMapper;
+import com.passus.st.emitter.SessionMapper;
+import com.passus.st.emitter.nio.NioEmitter;
+import com.passus.st.metric.ScheduledMetricsCollector;
+import com.passus.st.reporter.snmp.SnmpLogger;
+import com.passus.st.source.PcapSessionEventSource;
+import java.io.File;
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class JobExecutor {
+
+    private static final Logger LOGGER = LogManager.getLogger(JobExecutor.class);
+
+    private final Map<String, JobThread> jobThreads = new HashMap<>();
+
+    private final List<JobListener> listeners = new LinkedList<>();
+
+    public boolean stop(Project project) {
+        return stop(project, -1);
+    }
+
+    public boolean stop(Project project, long waitTime) {
+        synchronized (this) {
+            JobContext context = getContext(project);
+            if (context == null || context.getStatus() == JobStatus.STOPPED) {
+                return false;
+            }
+
+            JobThread jobThread = jobThreads.get(context.getJobId());
+            if (jobThread == null) {
+                return false;
+            }
+
+            jobThread.terminate();
+            try {
+                jobThread.interrupt();
+
+                if (waitTime > 0) {
+                    jobThread.join(waitTime);
+                } else {
+                    jobThread.join();
+                }
+            } catch (Exception ignore) {
+            }
+
+            return true;
+        }
+    }
+
+    public boolean hasJob(Project project) {
+        Assert.notNull(project, "project");
+        synchronized (this) {
+            return jobThreads.values().stream()
+                    .anyMatch((t) -> t.context.getProjectId().equals(project.getId()));
+        }
+    }
+
+    public boolean removeJob(Project project) {
+        Assert.notNull(project, "project");
+        synchronized (this) {
+            JobContext context = getContext(project);
+            if (context != null) {
+                if (context.getStatus() != JobStatus.STOPPED) {
+                    throw new IllegalArgumentException("Unable to remove project '" + project.getName() + "'. "
+                            + "There is associated job in status != STOPPED.");
+                }
+
+                jobThreads.remove(context.getJobId());
+                fireJobRemoved(context);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    public JobContext getContext(Project project) {
+        Assert.notNull(project, "project");
+        synchronized (this) {
+            for (JobThread t : jobThreads.values()) {
+                if (t.context.getProjectId().equals(project.getId())) {
+                    return t.context;
+                }
+            }
+
+            return null;
+        }
+    }
+
+    public void addListener(JobListener listener) {
+        Assert.notNull(listener, "listener");
+        synchronized (this) {
+            listeners.add(listener);
+        }
+    }
+
+    public boolean removeListener(JobListener listener) {
+        Assert.notNull(listener, "listener");
+        synchronized (this) {
+            return listeners.remove(listener);
+        }
+    }
+
+    private void fireJobRemoved(JobContext context) {
+        synchronized (this) {
+            listeners.forEach((listener) -> {
+                listener.onJobRemoved(context);
+            });
+        }
+    }
+
+    private void fireJobStatusChanged(JobContext context, JobStatus oldStatus) {
+        synchronized (this) {
+            listeners.forEach((listener) -> {
+                listener.onJobStatusChanged(context, oldStatus);
+            });
+        }
+    }
+
+    private void fireJobMetrics(JobContext context, MetricsCollection metrics) {
+        synchronized (this) {
+            listeners.forEach((listener) -> {
+                listener.onJobMetrics(context, metrics);
+            });
+        }
+    }
+
+    private void fireJobError(JobContext context, Throwable cause) {
+        synchronized (this) {
+            listeners.forEach((listener) -> {
+                listener.onJobError(context, cause);
+            });
+        }
+    }
+
+    public String add(Project project) {
+        synchronized (this) {
+            if (hasJob(project)) {
+                throw new IllegalArgumentException("Job for project '" + project.getName() + "' already exists.");
+            }
+
+            String jobId = StringUtils.randomString();
+            JobContext context = new JobContext(jobId, project.getId());
+            JobThread jobThread = new JobThread(context);
+            jobThread.start();
+            jobThreads.put(jobId, jobThread);
+            return jobId;
+        }
+    }
+
+    private class JobThread extends Thread {
+
+        private final JobContext context;
+
+        private Set<PcapSessionEventSource> eventSrcs;
+
+        private HttpClient httpClient;
+
+        private NioEmitter emitter;
+
+        private MemoryEventsCache cache;
+
+        private ScheduledMetricsCollector metricsCollector;
+
+        private SnmpLogger snmpLogger;
+
+        private JobThread(JobContext context) {
+            super("Job-" + context.getJobId());
+            this.context = context;
+            context.setStatus(JobStatus.WAITING);
+        }
+
+        private void configureFilters(Project project, HttpClient client) {
+            /*List<FilterSettings> filtersSettings = project.getHttpFiltersSettings();
+            if (filtersSettings.isEmpty()) {
+                return;
+            }
+
+            HttpFilterFactory factory = new HttpFilterFactory();
+            filtersSettings.forEach((filterSettings) -> {
+                if (filterSettings.isActive()) {
+                    try {
+                        HttpFilter filter = factory.getInstanceByName(filterSettings.getType());
+                        filter.configure(filterSettings.getConfig());
+                        client.addFilter(filter);
+                    } catch (Exception e) {
+                        LOGGER.warn(e.getMessage(), e);
+                    }
+                }
+            });*/
+        }
+
+        private void configure() {
+            /*metricsCollector = new ScheduledMetricsCollector();
+            metricsCollector.setPeriodTime(2_000);
+
+            //TODO Na razie pobieramy caly projekt, docelowo powinny byc tworzona kopia parametrow
+            Project project = STApp.PROJECT_MANAGER.load(context.getProjectId());
+
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Configuring job: {} project id: {}, name: {}",
+                        context.getJobId(), project.getId(), project.getName());
+            }
+
+            SessionMapper mapper;
+            String mapperRule = project.getMapperRule();
+            if (mapperRule == null) {
+                mapper = new PassThroughSessionMapper();
+            } else {
+                // TODO: refactor
+                RuleBasedSessionMapper rbsm = new RuleBasedSessionMapper();
+                try {
+                    rbsm.addRule(mapperRule);
+                    mapper = rbsm;
+                } catch (ParseException ex) {
+                    mapper = new PassThroughSessionMapper();
+                }
+            }
+
+            Set<String> pcapFiles = project.getPcapFiles();
+            if (pcapFiles == null || pcapFiles.isEmpty()) {
+                throw new IllegalArgumentException("At least one pcap file required.");
+            }
+
+            emitter = new NioEmitter();
+            emitter.setSessionMapper(mapper);
+            emitter.setCollectMetrics(true);
+
+            httpClient = new HttpClient(emitter);
+            httpClient.setCollectMetrics(true);
+            httpClient.setConnectPartialSession(project.isAllowPartialSessions());
+            httpClient.setWokerType(project.getWorkerType().toString());
+            configureFilters(project, httpClient);
+
+            if (project.getParallelReplays() != null) {
+                if (pcapFiles.size() != 1) {
+                    throw new IllegalArgumentException("Parameter \"parallelReplays\" works only for one pcap file.");
+                }
+
+                httpClient.setWorkersNum(project.getParallelReplays());
+            } else {
+                httpClient.setWorkersNum(pcapFiles.size());
+                httpClient.setDispatcher(new HttpSourceNameAwareClientWorkerDispatcher());
+            }
+
+            if (project.getReplaySpeed() > 0) {
+                float speed = project.getReplaySpeed();
+                if (speed > .1f && speed < 100f) {
+                    httpClient.setSleepFactor(1f / speed);
+                }
+            }
+            eventSrcs = new HashSet<>();
+
+            for (String pcapFile : pcapFiles) {
+                PcapSessionEventSource eventSrc = new PcapSessionEventSource();
+                eventSrc.setPcapFile(pcapFile);
+                eventSrc.setAllowPartialSession(project.isAllowPartialSessions());
+                eventSrc.setCollectMetrics(true);
+                eventSrc.getPortsRange().clear();
+                eventSrc.getPortsRange().addAll(project.getHttpPortsRange());
+                eventSrcs.add(eventSrc);
+            }
+
+            if (project.isEnableCache()) {
+                Set<String> sourcesName = new HashSet<>();
+                eventSrcs.forEach((eventSrc) -> {
+                    sourcesName.add(eventSrc.getName());
+                });
+
+                cache = new MemoryEventsCache(httpClient, sourcesName);
+                eventSrcs.forEach((eventSrc) -> {
+                    eventSrc.setHandler(cache);
+                });
+
+                cache.setLoop(project.getLoops());
+
+            } else {
+                eventSrcs.forEach((eventSrc) -> {
+                    eventSrc.setHandler(httpClient);
+                    eventSrc.setLoops(project.getLoops());
+                });
+            }
+
+            eventSrcs.forEach((eventSrc) -> {
+                metricsCollector.register(eventSrc);
+            });
+
+            metricsCollector.register(emitter);
+            metricsCollector.register(httpClient);
+            metricsCollector.addHandler((mc) -> {
+                fireJobMetrics(context, mc);
+            });
+
+            try {
+                File projectDir = STApp.PROJECT_MANAGER.getProjectDir(project);
+                File projectDataDir = new File(projectDir, "Data");
+
+                ReporterFileDestination reporterDestination = new ReporterFileDestination(projectDataDir);
+                Registry.getInstance().add(ReporterDestination.SERVICE_NAME, reporterDestination);
+                httpClient.addListener(reporterDestination);
+                metricsCollector.addHandler(reporterDestination);
+
+                if (project.isSnmpEnabled() && project.getSnmpHost() != null) {
+                    String quasiUrl = SnmpLogger.createQuasiUrl(project.getSnmpHost(), project.getSnmpPort());
+                    String communityString = project.getSnmpCommunity();
+                    System.out.println("GOING to start SNMP client " + quasiUrl + " " + communityString);
+                    if (communityString == null) {
+                        // TODO: usunac
+                        communityString = "passus";
+                    }
+                    snmpLogger = new SnmpLogger(quasiUrl, communityString, project.getSnmpPeriod(), projectDataDir);
+                    LOGGER.debug("SNMP logger started.");
+                }
+            } catch (Exception e) {
+                LOGGER.debug(e.getMessage(), e);
+            }*/
+        }
+
+        private void changeStatus(JobStatus status) {
+            if (status == JobStatus.RUNNING) {
+                context.setStartTime(System.currentTimeMillis());
+            }
+
+            JobStatus oldStatus = context.getStatus();
+            context.setStatus(status);
+            fireJobStatusChanged(context, oldStatus);
+
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Job {} status changed {} -> {}.", context.getJobId(), oldStatus, status);
+            }
+        }
+
+        private void terminate() {
+            if (context.getStatus() == JobStatus.STOPPED
+                    || context.getStatus() == JobStatus.STOPPING) {
+                return;
+            }
+
+            changeStatus(JobStatus.STOPPING);
+
+            metricsCollector.collect();
+
+            for (PcapSessionEventSource eventSrc : eventSrcs) {
+                try {
+                    eventSrc.stop();
+                } catch (Exception e) {
+                    if (LOGGER.isDebugEnabled()) {
+                        LOGGER.debug(e.getMessage(), e);
+                    }
+                }
+            }
+
+            try {
+                httpClient.stop();
+            } catch (Exception e) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug(e.getMessage(), e);
+                }
+            }
+
+            try {
+                emitter.stop();
+            } catch (Exception e) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug(e.getMessage(), e);
+                }
+            }
+
+            if (snmpLogger != null) {
+                snmpLogger.close();
+                LOGGER.debug("SNMP logger stopped.");
+            }
+
+            metricsCollector.flush(true);
+            metricsCollector.stop();
+            changeStatus(JobStatus.STOPPED);
+        }
+
+        private void exceptionCaught(Throwable cause) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug(cause);
+            }
+
+            fireJobError(context, cause);
+            terminate();
+        }
+
+        @Override
+        public void run() {
+            changeStatus(JobStatus.STARTING);
+            try {
+                configure();
+            } catch (Exception e) {
+                exceptionCaught(e);
+                return;
+            }
+
+            try {
+                metricsCollector.start();
+                emitter.start();
+                httpClient.start();
+                eventSrcs.forEach((eventSrc) -> {
+                    eventSrc.start();
+                });
+
+                changeStatus(JobStatus.RUNNING);
+
+                if (cache != null) {
+                    try {
+                        cache.await();
+                    } catch (InterruptedException ignore) {
+                    }
+
+                    cache.send();
+                }
+            } catch (Exception e) {
+                exceptionCaught(e);
+                return;
+            }
+
+            try {
+                httpClient.join();
+            } catch (Exception ignore) {
+            }
+
+            terminate();
+        }
+
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/job/JobListener.java	Wed Jan 17 14:47:57 2018 +0100
@@ -0,0 +1,26 @@
+package com.passus.st.job;
+
+import com.passus.commons.metric.MetricsCollection;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public interface JobListener {
+
+    public default void onJobRemoved(JobContext context) {
+        
+    }
+    
+    public default void onJobStatusChanged(JobContext context, JobStatus oldStatus) {
+        
+    }
+
+    public default void onJobError(JobContext context, Throwable cause) {
+        
+    }
+
+    public default void onJobMetrics(JobContext context, MetricsCollection collection) {
+        
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/job/JobStatus.java	Wed Jan 17 14:47:57 2018 +0100
@@ -0,0 +1,21 @@
+package com.passus.st.job;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public enum JobStatus {
+
+    NEW(0), WAITING(1), STARTING(2), RUNNING(3), SUSPENDED(4), ERROR(5), STOPPING(6), STOPPED(7);
+
+    private final int priority;
+
+    private JobStatus(int priority) {
+        this.priority = priority;
+    }
+
+    public int priority() {
+        return priority;
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/job/TestJob.java	Wed Jan 17 14:47:57 2018 +0100
@@ -0,0 +1,19 @@
+package com.passus.st.job;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class TestJob implements Job {
+
+    @Override
+    public void start() {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public void stop() {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+    
+}
--- a/stress-tester/src/main/java/com/passus/st/plugin/PluginConstants.java	Wed Jan 17 12:14:07 2018 +0100
+++ b/stress-tester/src/main/java/com/passus/st/plugin/PluginConstants.java	Wed Jan 17 14:47:57 2018 +0100
@@ -15,9 +15,11 @@
     public static final String CATEGORY_EVENT_SOURCE = "EventSource";
 
     public static final String CATEGORY_METRICS_COLLECTION_APPENDER = "MetricsCollectionAppender";
-    
+
     public static final String CATEGORY_SESSION_MAPPER = "SessionMapper";
-    
+
+    public static final String CATEGORY_CLIENT = "Client";
+
     public static final String CATEGORY_HTTP_CLIENT_WORKER = "ClientWorker";
 
     private PluginConstants() {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/project/Project.java	Wed Jan 17 14:47:57 2018 +0100
@@ -0,0 +1,51 @@
+package com.passus.st.project;
+
+import com.passus.config.Configuration;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class Project {
+
+    private String id;
+
+    private String name;
+
+    private boolean active;
+
+    private Configuration config;
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public boolean isActive() {
+        return active;
+    }
+
+    public void setActive(boolean active) {
+        this.active = active;
+    }
+
+    public Configuration getConfig() {
+        return config;
+    }
+
+    public void setConfig(Configuration config) {
+        this.config = config;
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/config/ClientConfiguratorTest.java	Wed Jan 17 14:47:57 2018 +0100
@@ -0,0 +1,64 @@
+package com.passus.st.config;
+
+import com.passus.commons.plugin.PluginFactory;
+import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
+import com.passus.config.ConfigurationContextImpl;
+import com.passus.config.YamlConfigurationReader;
+import com.passus.config.validation.Errors;
+import com.passus.st.Log4jConfigurationFactory;
+import com.passus.st.client.Client;
+import com.passus.st.client.http.HttpClient;
+import java.io.IOException;
+import java.util.List;
+import static org.testng.Assert.*;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class ClientConfiguratorTest {
+
+    @Test
+    public void testConfigure() throws Exception {
+        Log4jConfigurationFactory.enableFactory("debug");
+        String configStr = "client:\n"
+                + "    - type: http\n"
+                + "      connectPartialSession: true\n"
+                + "      collectMetrics: true\n"
+                + "      workers: 4\n";
+                /*+ "      filters:\n"
+                + "        - type: formAuth\n"
+                + "          active: true\n"
+                + "          loginCheckUrl: /login\n"
+                + "          userField: username\n"
+                + "          passwordField: password\n"
+                + "          applyIf:\n"
+                + "              \"req.url\": {$contains: \"/login\"}\n"
+                + "          provider:\n"
+                + "            UsernamePassword:\n"
+                + "                username: user2\n"
+                + "                password: 'Qwe!23'";*/
+
+        Configuration config = YamlConfigurationReader.readFromString(configStr);
+        ClientConfigurator configurator = new ClientConfigurator();
+
+        Errors errors = new Errors();
+        ConfigurationContext context = new ConfigurationContextImpl();
+        configurator.configure(config.subConfiguration("client"), errors, context);
+        assertFalse(errors.hasError());
+
+        List<Client> clients = (List<Client>) context.get("clients");
+        assertTrue(clients.get(0) instanceof HttpClient);
+        HttpClient httpClient = (HttpClient) clients.get(0);
+        assertTrue(httpClient.isCollectMetrics());
+        assertTrue(httpClient.isConnectPartialSession());
+        assertEquals(4, httpClient.getWorkersNum());
+    }
+
+}