Mercurial > stress-tester
changeset 965:acb064dc26b9
Refactorization in progress
author | Devel 2 |
---|---|
date | Fri, 31 May 2019 11:40:55 +0200 |
parents | d1e54d1d3e1e |
children | 74eb9b0e8b37 |
files | stress-tester/src/main/java/com/passus/st/client/Client.java stress-tester/src/main/java/com/passus/st/client/ClientFactory.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerFactory.java stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java stress-tester/src/main/java/com/passus/st/client/http/HttpClientListener.java stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerDispatcher.java stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerFactory.java stress-tester/src/main/java/com/passus/st/client/http/HttpSourceNameAwareClientWorkerDispatcher.java stress-tester/src/main/java/com/passus/st/config/ClientNodeDefinitionCreator.java stress-tester/src/test/java/com/passus/st/config/TestJobConfiguratorTest.java |
diffstat | 11 files changed, 5 insertions(+), 739 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/Client.java Fri May 31 11:37:28 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,39 +0,0 @@ -package com.passus.st.client; - -import com.passus.commons.service.Service; -import com.passus.config.Configurable; -import com.passus.st.emitter.Emitter; -import com.passus.st.metric.MetricSource; - -import java.util.Collection; -import java.util.List; - -/** - * - * - * @author Mirosław Hawrot - */ -@Deprecated -public interface Client extends EventHandler, MetricSource, Service, Configurable { - - Emitter getEmitter(); - - void setEmitter(Emitter emitter); - - void join() throws InterruptedException; - - void setListeners(Collection<ClientListener> listeners); - - void addListener(ClientListener listener); - - void removeListener(ClientListener listener); - - List<ClientListener> getListeners(); - - List<FlowFilter> getFilters(); - - void addFilter(FlowFilter filter); - - void setFilters(Collection<FlowFilter> filters); - -}
--- a/stress-tester/src/main/java/com/passus/st/client/ClientFactory.java Fri May 31 11:37:28 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,26 +0,0 @@ -package com.passus.st.client; - -import com.passus.commons.plugin.PluginFactory; -import com.passus.st.plugin.PluginConstants; - -/** - * - * @author Mirosław Hawrot - */ -@Deprecated -public class ClientFactory extends PluginFactory<Client> { - - private static ClientFactory instance; - - public ClientFactory() { - super(PluginConstants.CATEGORY_CLIENT, Client.class); - } - - public static synchronized ClientFactory getInstance() { - if (instance == null) { - instance = new ClientFactory(); - } - - return instance; - } -}
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerFactory.java Fri May 31 11:37:28 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerFactory.java Fri May 31 11:40:55 2019 +0200 @@ -1,7 +1,6 @@ package com.passus.st.client; import com.passus.commons.plugin.PluginFactory; -import com.passus.st.client.http.HttpClientWorker; import com.passus.st.plugin.PluginConstants; /**
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java Fri May 31 11:37:28 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,406 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.commons.Assert; -import com.passus.commons.annotations.Plugin; -import com.passus.commons.service.ServiceException; -import com.passus.config.Configuration; -import com.passus.config.ConfigurationContext; -import com.passus.config.ValueTransformer; -import com.passus.config.annotations.NodeDefinitionCreate; -import com.passus.config.schema.DynaKeyValueVaryListNodeDefinition; -import com.passus.config.schema.NodeDefinition; -import com.passus.config.schema.NodeDefinitionCreator; -import com.passus.config.validation.EnumValidator; -import com.passus.config.validation.Errors; -import com.passus.config.validation.LongValidator; -import com.passus.st.client.*; -import com.passus.st.client.FlowFilterNodeDefinitionCreator; -import com.passus.st.emitter.Emitter; -import com.passus.st.metric.MetricsContainer; -import com.passus.st.plugin.PluginConstants; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.*; - -import static com.passus.config.schema.ConfigurationSchemaBuilder.*; - - -/** - * @author Mirosław Hawrot - */ -@SuppressWarnings("ALL") -@Deprecated -@NodeDefinitionCreate(HttpClient.HttpClientNodeDefCreator.class) -@Plugin(name = HttpClient.TYPE, category = PluginConstants.CATEGORY_CLIENT) -public class HttpClient implements Client, FilterAware { - - public static final String TYPE = "http"; - - private static final int DEFAULT_WORKERS_NUM = 1; - - private static final boolean DEFAULT_CONNECT_PARTIAL_SESSION = false; - - private static final boolean DEFAULT_COLLECT_METRICS = false; - - private static final Logger LOGGER = LogManager.getLogger(HttpClient.class); - - private Emitter emitter; - - private HttpClientWorker[] workers; - - private long connCloseTimeout = 5_000; - - private final List<ClientListener> listeners = new ArrayList<>(); - - private FlowFilterChain filterChain = new FlowFilterChain(); - - private volatile boolean started = false; - - private float sleepFactor = HttpClientWorker.SLEEP_FACTOR_NO_SLEEP; - - private String wokerType = "synch"; - - private int workersNum = DEFAULT_WORKERS_NUM; - - private boolean collectMetrics = DEFAULT_COLLECT_METRICS; - - private boolean connectPartialSession = DEFAULT_CONNECT_PARTIAL_SESSION; - - private HttpClientWorkerDispatcher dispatcher; - - public HttpClient() { - } - - public HttpClient(Emitter emitter) { - Assert.notNull(emitter, "emitter"); - this.emitter = emitter; - } - - @Override - public Emitter getEmitter() { - return emitter; - } - - @Override - public void setEmitter(Emitter emitter) { - this.emitter = emitter; - } - - public void setListeners(Collection<ClientListener> listeners) { - Assert.notContainsNull(listeners, "listeners"); - synchronized (listeners) { - this.listeners.addAll(listeners); - } - } - - public void addListener(ClientListener listener) { - synchronized (listeners) { - listeners.add(listener); - } - } - - public void removeListener(ClientListener listener) { - synchronized (listeners) { - listeners.remove(listener); - } - } - - public List<ClientListener> getListeners() { - synchronized (listeners) { - return Collections.unmodifiableList(listeners); - } - } - - @Override - public List<FlowFilter> getFilters() { - return filterChain.getFilters(); - } - - @Override - public void addFilter(FlowFilter filter) { - Assert.notNull(filter, "filter"); - filterChain.addFilter(filter); - } - - @Override - public void setFilters(Collection<FlowFilter> filters) { - Assert.notContainsNull(filters, "filters"); - filterChain.clear(); - filters.forEach((filter) -> filterChain.addFilter(filter)); - } - - public String getWokerType() { - return wokerType; - } - - public void setWokerType(String wokerType) { - Assert.notNull(wokerType, "wokerType"); - - if (!HttpClientWorkerFactory.getInstance().containsName(wokerType)) { - throw new IllegalArgumentException("Unknwon worker type '" + wokerType + "'."); - } - - this.wokerType = wokerType; - } - - public int getWorkersNum() { - return workersNum; - } - - public void setWorkersNum(int workersNum) { - Assert.greaterThanZero(workersNum, "workersNum"); - this.workersNum = workersNum; - } - - public float getSleepFactor() { - return sleepFactor; - } - - public void setSleepFactor(float sleepFactor) { - this.sleepFactor = sleepFactor; - } - - public HttpClientWorkerDispatcher getDispatcher() { - return dispatcher; - } - - public void setDispatcher(HttpClientWorkerDispatcher dispatcher) { - this.dispatcher = dispatcher; - } - - public int getActiveConnections() { - int count = 0; - for (HttpClientWorker worker : workers) { - count += worker.activeConnections(); - } - return count; - } - - @Override - public boolean isCollectMetrics() { - return collectMetrics; - } - - @Override - public void setCollectMetrics(boolean collectMetrics) { - this.collectMetrics = collectMetrics; - if (workers != null) { - for (HttpClientWorker worker : workers) { - worker.setCollectMetrics(collectMetrics); - } - } - } - - public boolean isConnectPartialSession() { - return connectPartialSession; - } - - public void setConnectPartialSession(boolean connectPartialSession) { - this.connectPartialSession = connectPartialSession; - if (workers != null) { - for (HttpClientWorker worker : workers) { - worker.setConnectPartialSession(collectMetrics); - } - } - } - - @Override - public void writeMetrics(MetricsContainer container) { - if (collectMetrics && workers != null) { - for (HttpClientWorker worker : workers) { - worker.writeMetrics(container); - } - } - } - - @SuppressWarnings("unchecked") - @Override - public void configure(Configuration config, ConfigurationContext context) { - setFilters((List<FlowFilter>) config.get("filters", Collections.EMPTY_LIST)); - setListeners((List<ClientListener>) config.get("listeners", Collections.EMPTY_LIST)); - setWorkersNum(config.getInteger("workers", DEFAULT_WORKERS_NUM)); - connectPartialSession = config.getBoolean("connectPartialSession", DEFAULT_CONNECT_PARTIAL_SESSION); - collectMetrics = config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS); - dispatcher = (HttpClientWorkerDispatcher) config.get("dispatcher", null); - } - - @Override - public boolean isStarted() { - return started; - } - - public boolean isWorking() { - if (started) { - for (HttpClientWorker worker : workers) { - if (worker.isWorking()) { - return true; - } - } - } - - return false; - } - - @Override - public void join() throws InterruptedException { - for (HttpClientWorker worker : workers) { - worker.join(); - } - } - - @Override - public void start() { - if (started) { - return; - } - - if (emitter == null) { - throw new ServiceException("Emitter not set."); - } - - HttpClientWorkerFactory factory = HttpClientWorkerFactory.getInstance(); - String threadName = getClass().getSimpleName() + "-worker-"; - workers = new HttpClientWorker[workersNum]; - Class[] parameterTypes = {Emitter.class, String.class, Integer.TYPE}; - - for (int i = 0; i < workersNum; i++) { - Object[] initArgs = {emitter, threadName, i}; - HttpClientWorker worker = factory.getInstanceByName(wokerType, parameterTypes, initArgs); - if (worker == null) { - throw new ServiceException("Unable to create instance of worker '" + wokerType + "'"); - } - - worker.setListeners(listeners); - worker.setFilterChain(filterChain.instanceForWorker(i)); - -// if (worker instanceof HttpFlowBasedClientWorker) { -// ((HttpFlowBasedClientWorker) worker).setSleepFactor(sleepFactor); -// } - - worker.setCollectMetrics(collectMetrics); - worker.setConnectPartialSession(connectPartialSession); - workers[i] = worker; - worker.start(); - } - - started = true; - } - - @Override - public void stop() { - if (!started) { - return; - } - - for (HttpClientWorker worker : workers) { - worker.close(); - worker.interrupt(); - - try { - worker.join(); - } catch (Exception ignore) { - } - } - - workers = null; - started = false; - } - - public void closeAllConnections() { - for (HttpClientWorker worker : workers) { - worker.close(); - } - } - - @Override - public void handle(Event event) { - if (!started) { - throw new IllegalArgumentException("Service not started."); - } - - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Event: {}", event); - } - - try { - if (dispatcher == null) { - for (HttpClientWorker worker : workers) { - worker.handle(event); - } - } else { - int index = dispatcher.dispatch(event, workers); - workers[index].handle(event); - } - } catch (Exception e) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(e.getMessage(), e); - } - } - - } - - private static class DispatcherTransformer implements ValueTransformer { - - private static final DispatcherTransformer INSTANCE = new DispatcherTransformer(); - - @Override - public HttpClientWorkerDispatcher transform(Object obj, Errors errors, ConfigurationContext context) { - if (obj == null) { - return null; - } - if (obj instanceof String) { - String s = (String) obj; - switch (s) { - case "source": - return new HttpSourceNameAwareClientWorkerDispatcher(); - case "parallel": - return null; - default: - errors.reject("Unknown dispatcher type: " + s); - return null; - } - } - - errors.reject("Invalid type."); - return null; - } - - @Override - public Object reverseTransform(Object obj, Errors errors, ConfigurationContext context) { - throw new UnsupportedOperationException("Not supported yet."); - } - - } - - public static class HttpClientNodeDefCreator implements NodeDefinitionCreator { - - private static final Set<String> DISPATCHERS = new HashSet<>(Arrays.asList( - "source", "parallel" - )); - - @Override - public NodeDefinition create() { - return mapDef( - tupleDef("connectPartialSession", valueDefBool()).setRequired(false), - tupleDef("collectMetrics", valueDefBool()).setRequired(false), - tupleDef("workers", valueDefInteger() - .addValidator(LongValidator.GREATER_ZERO) - ).setRequired(false), - tupleDef("dispatcher", valueDef() - .addValidator(new EnumValidator(DISPATCHERS, false)) - .setTransformer(DispatcherTransformer.INSTANCE) - ).setRequired(false), - tupleDef("filters", - FlowFilterNodeDefinitionCreator.createFiltersList(true) - ).setRequired(false), - tupleDef("listeners", - new DynaKeyValueVaryListNodeDefinition("type", ClientListenerFactory.getInstance()) - .setTransformToPluginObject(true) - ).setRequired(false) - ); - } - - } - -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientListener.java Fri May 31 11:37:28 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientListener.java Fri May 31 11:40:55 2019 +0200 @@ -8,10 +8,8 @@ /** * @author Mirosław Hawrot */ -@Deprecated public interface HttpClientListener extends ClientListener { - default void responseReceived(Object request, Object response, FlowContext context) { if (request instanceof HttpRequest && response instanceof HttpResponse) {
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorker.java Fri May 31 11:37:28 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,137 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.commons.Assert; -import com.passus.net.http.HttpRequest; -import com.passus.net.http.HttpResponse; -import com.passus.st.client.ClientListener; -import com.passus.st.client.Event; -import com.passus.st.client.FlowContext; -import com.passus.st.client.FlowFilterChain; -import com.passus.st.emitter.Emitter; -import com.passus.st.emitter.EmitterHandler; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.metric.MetricSource; -import com.passus.st.metric.MetricsContainer; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -/** - * @author Mirosław Hawrot - */ -@Deprecated -public abstract class HttpClientWorker extends Thread implements EmitterHandler, MetricSource { - - public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f; - - protected final Logger logger = LogManager.getLogger(getClass()); - - protected final int index; - - private final List<ClientListener> listeners = new ArrayList<>(); - - protected FlowFilterChain filterChain = new FlowFilterChain(); - - protected final Emitter emitter; - - protected boolean collectMetric; - - protected HttpClientWorkerMetric metric; - - protected boolean connectPartialSession = false; - - public HttpClientWorker(Emitter emitter, String name, int index) { - super(name + index); - Assert.notNull(emitter, "emitter"); - this.emitter = emitter; - this.index = index; - } - - public boolean isConnectPartialSession() { - return connectPartialSession; - } - - public void setConnectPartialSession(boolean connectPartialSession) { - this.connectPartialSession = connectPartialSession; - } - - public abstract boolean isWorking(); - - public int index() { - return index; - } - - public FlowFilterChain filterChain() { - return filterChain; - } - - public void setFilterChain(FlowFilterChain filterChain) { - Assert.notNull(filterChain, "filterChain"); - this.filterChain = filterChain; - } - - protected final void fireResponseReceived(HttpRequest request, HttpResponse response, FlowContext context) { - for (ClientListener listener : listeners) { - listener.responseReceived(request, response, context); - } - } - - public Collection<ClientListener> listeners() { - return listeners; - } - - public void setListeners(Collection<ClientListener> listeners) { - this.listeners.clear(); - this.listeners.addAll(listeners); - } - - public void addListener(ClientListener listener) { - Assert.notNull(listener, "listener"); - this.listeners.add(listener); - } - - public void removeListener(ClientListener listener) { - Assert.notNull(listener, "listener"); - this.listeners.remove(listener); - } - - @Override - public boolean isCollectMetrics() { - return metric != null; - } - - @Override - public void setCollectMetrics(boolean collectMetrics) { - if (collectMetrics && metric == null) { - metric = new HttpClientWorkerMetric(); - metric.activate(); - collectMetric = true; - } else if (!collectMetrics && metric != null) { - metric.deactivate(); - collectMetric = false; - metric = null; - } - } - - @Override - public void writeMetrics(MetricsContainer container) { - if (collectMetric) { - synchronized (metric) { - container.update(System.currentTimeMillis(), metric); - metric.reset(); - } - } - } - - public abstract int activeConnections(); - - public abstract void close(); - - public abstract void close(SessionInfo session); - - public abstract void handle(Event event); - -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerDispatcher.java Fri May 31 11:37:28 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,15 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.st.client.Event; - -/** - * @author Mirosław Hawrot - */ -@Deprecated -public interface HttpClientWorkerDispatcher { - - HttpClientWorker find(Event event, HttpClientWorker[] workers); - - int dispatch(Event event, HttpClientWorker[] workers); - -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerFactory.java Fri May 31 11:37:28 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,26 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.commons.plugin.PluginFactory; -import com.passus.st.plugin.PluginConstants; - -/** - * @author Mirosław Hawrot - */ -@Deprecated -public class HttpClientWorkerFactory extends PluginFactory<HttpClientWorker> { - - private static HttpClientWorkerFactory instance; - - public HttpClientWorkerFactory() { - super(PluginConstants.CATEGORY_HTTP_CLIENT_WORKER, HttpClientWorker.class); - } - - public static synchronized HttpClientWorkerFactory getInstance() { - if (instance == null) { - instance = new HttpClientWorkerFactory(); - } - - return instance; - } - -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSourceNameAwareClientWorkerDispatcher.java Fri May 31 11:37:28 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,41 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.st.client.Event; -import java.util.ArrayList; -import java.util.List; - -/** - * - * @author Mirosław Hawrot - */ -public class HttpSourceNameAwareClientWorkerDispatcher implements HttpClientWorkerDispatcher { - - private final List<String> sources = new ArrayList<>(4); - - @Override - public HttpClientWorker find(Event event, HttpClientWorker[] workers) { - int index = dispatch(event, workers); - return workers[index]; - } - - @Override - public int dispatch(Event event, HttpClientWorker[] workers) { - String sourceName = event.getSourceName(); - if (sourceName == null) { - throw new IllegalArgumentException("Event '" + event.getClass() + "' source name cannot be null."); - } - - int index = sources.indexOf(sourceName); - if (index == -1) { - if (sources.size() == workers.length) { - throw new IllegalArgumentException("Too low workers number."); - } - - sources.add(sourceName); - index = sources.size() - 1; - } - - return index; - } - -}
--- a/stress-tester/src/main/java/com/passus/st/config/ClientNodeDefinitionCreator.java Fri May 31 11:37:28 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,39 +0,0 @@ -package com.passus.st.config; - -import com.passus.commons.Assert; -import com.passus.commons.plugin.PluginFactory; -import com.passus.config.schema.DynaKeyValueVaryListNodeDefinition; -import com.passus.config.schema.NodeDefinition; -import com.passus.config.schema.NodeDefinitionCreator; -import com.passus.st.client.Client; -import com.passus.st.client.ClientFactory; -import com.passus.st.utils.ConfigurationContextConsts; - -import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef; -import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef; - -public class ClientNodeDefinitionCreator implements NodeDefinitionCreator { - - private PluginFactory<Client> clientFactory = ClientFactory.getInstance(); - - public PluginFactory<Client> getClientFactory() { - return clientFactory; - } - - public void setClientFactory(PluginFactory<Client> clientFactory) { - Assert.notNull(clientFactory, "clientFactory"); - this.clientFactory = clientFactory; - } - - public NodeDefinition createClientsListDef() { - return new DynaKeyValueVaryListNodeDefinition("type", clientFactory) - .setTransformToPluginObject(true); - } - - @Override - public NodeDefinition create() { - return mapDef( - tupleDef("clients", createClientsListDef()) - ); - } -}
--- a/stress-tester/src/test/java/com/passus/st/config/TestJobConfiguratorTest.java Fri May 31 11:37:28 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/config/TestJobConfiguratorTest.java Fri May 31 11:40:55 2019 +0200 @@ -8,8 +8,7 @@ import com.passus.config.validation.Errors; import com.passus.filter.UnmutableValueExtractor; import com.passus.filter.ValueExtractor; -import com.passus.st.client.Client; -import com.passus.st.client.http.HttpClient; +import com.passus.st.client.FlowExecutor; import com.passus.st.source.EventSource; import com.passus.st.source.NcEventSource; import com.passus.st.source.PcapSessionEventSource; @@ -37,16 +36,15 @@ errors.getAllErrors().forEach(System.out::println); assertFalse(errors.hasError()); - List<Client> clients = (List<Client>) context.get(ConfigurationContextConsts.CLIENT_CLIENTS); - assertEquals(1, clients.size()); - assertTrue(clients.get(0) instanceof HttpClient); + FlowExecutor flowExecutor = context.get(ConfigurationContextConsts.FLOW_EXECUTOR); + assertNotNull(flowExecutor); - List<EventSource> sources = (List<EventSource>) context.get(ConfigurationContextConsts.EVENT_SOURCE_SOURCES); + List<EventSource> sources = context.get(ConfigurationContextConsts.EVENT_SOURCE_SOURCES); assertEquals(2, sources.size()); assertTrue(sources.get(0) instanceof PcapSessionEventSource); assertTrue(sources.get(1) instanceof NcEventSource); - Map<String, ValueExtractor> vars = (Map<String, ValueExtractor>) context.get(ConfigurationContextConsts.APP_VARS); + Map<String, ValueExtractor> vars = context.get(ConfigurationContextConsts.APP_VARS); assertEquals(1, vars.size()); UnmutableValueExtractor value = (UnmutableValueExtractor) vars.get("varName"); assertEquals("varValue", value.getValue());