Mercurial > stress-tester
changeset 959:773f0f4ff33b
Refactorization in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java Thu May 30 10:26:34 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java Thu May 30 13:09:03 2019 +0200 @@ -1,10 +1,12 @@ package com.passus.st.client; import com.passus.commons.Assert; +import com.passus.commons.annotations.Plugin; import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpResponse; import com.passus.st.emitter.Emitter; import com.passus.st.emitter.SessionInfo; +import com.passus.st.plugin.PluginConstants; import java.util.Iterator; import java.util.LinkedList; @@ -12,6 +14,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +@Plugin(name = AsynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER) public class AsynchFlowWorker extends FlowWorkerBased { public static final String TYPE = "asynch";
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java Thu May 30 13:09:03 2019 +0200 @@ -0,0 +1,365 @@ +package com.passus.st.client; + +import com.passus.commons.Assert; +import com.passus.commons.service.Service; +import com.passus.commons.service.ServiceException; +import com.passus.config.Configurable; +import com.passus.config.Configuration; +import com.passus.config.ConfigurationContext; +import com.passus.config.ValueTransformer; +import com.passus.config.schema.DynaKeyValueVaryListNodeDefinition; +import com.passus.config.schema.NodeDefinition; +import com.passus.config.schema.NodeDefinitionCreator; +import com.passus.config.validation.EnumValidator; +import com.passus.config.validation.Errors; +import com.passus.config.validation.LongValidator; +import com.passus.st.client.http.HttpClient; +import com.passus.st.client.http.HttpClientListenerFactory; +import com.passus.st.client.http.filter.HttpFiltersNodeDefinitionCreator; +import com.passus.st.emitter.Emitter; +import com.passus.st.metric.MetricSource; +import com.passus.st.metric.MetricsContainer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.*; + +import static com.passus.config.schema.ConfigurationSchemaBuilder.*; + +public class FlowExecutor implements EventHandler, MetricSource, Service, Configurable { + + private static final int DEFAULT_WORKERS_NUM = 1; + + private static final boolean DEFAULT_CONNECT_PARTIAL_SESSION = false; + + private static final boolean DEFAULT_COLLECT_METRICS = false; + + private static final Logger LOGGER = LogManager.getLogger(HttpClient.class); + + private Emitter emitter; + + private FlowWorker[] workers; + + private long connCloseTimeout = 5_000; + + private ClientListener listener; + + private FlowFilterChain filterChain = new FlowFilterChain(); + + private volatile boolean started = false; + + //private float sleepFactor = FlowWorker.SLEEP_FACTOR_NO_SLEEP; + + private String wokerType = "synch"; + + private int workersNum = DEFAULT_WORKERS_NUM; + + private boolean collectMetrics = DEFAULT_COLLECT_METRICS; + + private boolean connectPartialSession = DEFAULT_CONNECT_PARTIAL_SESSION; + + private FlowWorkerDispatcher dispatcher; + + public Emitter getEmitter() { + return emitter; + } + + public void setEmitter(Emitter emitter) { + this.emitter = emitter; + } + + public void setListener(ClientListener listener) { + this.listener = listener; + } + + public ClientListener getListener() { + return listener; + } + + public List<FlowFilter> getFilters() { + return filterChain.getFilters(); + } + + public void addFilter(FlowFilter filter) { + Assert.notNull(filter, "filter"); + filterChain.addFilter(filter); + } + + public void setFilters(Collection<FlowFilter> filters) { + Assert.notContainsNull(filters, "filters"); + filterChain.clear(); + filters.forEach((filter) -> filterChain.addFilter(filter)); + } + + public String getWokerType() { + return wokerType; + } + + public void setWokerType(String wokerType) { + Assert.notNull(wokerType, "wokerType"); + + if (!FlowWorkerFactory.getInstance().containsName(wokerType)) { + throw new IllegalArgumentException("Unknwon worker type '" + wokerType + "'."); + } + + this.wokerType = wokerType; + } + + public int getWorkersNum() { + return workersNum; + } + + public void setWorkersNum(int workersNum) { + Assert.greaterThanZero(workersNum, "workersNum"); + this.workersNum = workersNum; + } + +/* public float getSleepFactor() { + return sleepFactor; + } + + public void setSleepFactor(float sleepFactor) { + this.sleepFactor = sleepFactor; + }*/ + + public FlowWorkerDispatcher getDispatcher() { + return dispatcher; + } + + public void setDispatcher(FlowWorkerDispatcher dispatcher) { + Assert.notNull(dispatcher, "dispatcher"); + this.dispatcher = dispatcher; + } + + public int getActiveConnections() { + int count = 0; + for (FlowWorker worker : workers) { + count += worker.activeConnections(); + } + return count; + } + + @Override + public boolean isCollectMetrics() { + return collectMetrics; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + this.collectMetrics = collectMetrics; + if (workers != null) { + for (FlowWorker worker : workers) { + worker.setCollectMetrics(collectMetrics); + } + } + } + + public boolean isConnectPartialSession() { + return connectPartialSession; + } + + public void setConnectPartialSession(boolean connectPartialSession) { + this.connectPartialSession = connectPartialSession; + if (workers != null) { + for (FlowWorker worker : workers) { + worker.setConnectPartialSession(collectMetrics); + } + } + } + + @Override + public void writeMetrics(MetricsContainer container) { + if (collectMetrics && workers != null) { + for (FlowWorker worker : workers) { + worker.writeMetrics(container); + } + } + } + + @SuppressWarnings("unchecked") + @Override + public void configure(Configuration config, ConfigurationContext context) { + setFilters((List<FlowFilter>) config.get("filters", Collections.EMPTY_LIST)); + //setListeners((List<ClientListener>) config.get("listeners", Collections.EMPTY_LIST)); + setWorkersNum(config.getInteger("workers", DEFAULT_WORKERS_NUM)); + connectPartialSession = config.getBoolean("connectPartialSession", DEFAULT_CONNECT_PARTIAL_SESSION); + collectMetrics = config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS); + dispatcher = (FlowWorkerDispatcher) config.get("dispatcher", null); + } + + @Override + public boolean isStarted() { + return started; + } + + public boolean isWorking() { + if (started) { + for (FlowWorker worker : workers) { + if (worker.isWorking()) { + return true; + } + } + } + + return false; + } + + public void join() throws InterruptedException { + for (FlowWorker worker : workers) { + worker.join(); + } + } + + @Override + public void start() { + if (started) { + return; + } + + if (emitter == null) { + throw new ServiceException("Emitter not set."); + } + + FlowWorkerFactory factory = FlowWorkerFactory.getInstance(); + String threadName = getClass().getSimpleName() + "-worker-"; + workers = new FlowWorker[workersNum]; + Class[] parameterTypes = {Emitter.class, String.class, Integer.TYPE}; + + for (int i = 0; i < workersNum; i++) { + Object[] initArgs = {emitter, threadName, i}; + FlowWorker worker = factory.getInstanceByName(wokerType, parameterTypes, initArgs); + if (worker == null) { + throw new ServiceException("Unable to create instance of worker '" + wokerType + "'"); + } + + worker.setListener(listener); + worker.setFilterChain(filterChain.instanceForWorker(i)); + +// if (worker instanceof HttpFlowBasedClientWorker) { +// ((HttpFlowBasedClientWorker) worker).setSleepFactor(sleepFactor); +// } + + worker.setCollectMetrics(collectMetrics); + worker.setConnectPartialSession(connectPartialSession); + workers[i] = worker; + worker.start(); + } + + started = true; + } + + @Override + public void stop() { + if (!started) { + return; + } + + for (FlowWorker worker : workers) { + worker.close(); + worker.interrupt(); + + try { + worker.join(); + } catch (Exception ignore) { + } + } + + workers = null; + started = false; + } + + public void closeAllConnections() { + for (FlowWorker worker : workers) { + worker.close(); + } + } + + @Override + public void handle(Event event) { + if (!started) { + throw new IllegalArgumentException("Service not started."); + } + + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Event: {}", event); + } + + try { + if (dispatcher == null) { + for (FlowWorker worker : workers) { + worker.handle(event); + } + } else { + int index = dispatcher.dispatch(event, workers); + workers[index].handle(event); + } + } catch (Exception e) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(e.getMessage(), e); + } + } + + } + + private static class DispatcherTransformer implements ValueTransformer { + + private static final DispatcherTransformer INSTANCE = new DispatcherTransformer(); + + @Override + public FlowWorkerDispatcher transform(Object obj, Errors errors, ConfigurationContext context) { + if (obj == null) { + return null; + } + if (obj instanceof String) { + String s = (String) obj; + switch (s) { + case "source": + return new SourceNameAwareFlowWorkerDispatcher(); + default: + errors.reject("Unknown dispatcher type: " + s); + return null; + } + } + + errors.reject("Invalid type."); + return null; + } + + @Override + public Object reverseTransform(Object obj, Errors errors, ConfigurationContext context) { + throw new UnsupportedOperationException("Not supported yet."); + } + + } + + public static class HttpClientNodeDefCreator implements NodeDefinitionCreator { + + private static final Set<String> DISPATCHERS = new HashSet<>(Arrays.asList( + "source", "parallel" + )); + + @Override + public NodeDefinition create() { + return mapDef( + tupleDef("connectPartialSession", valueDefBool()).setRequired(false), + tupleDef("collectMetrics", valueDefBool()).setRequired(false), + tupleDef("workers", valueDefInteger() + .addValidator(LongValidator.GREATER_ZERO) + ).setRequired(false), + tupleDef("dispatcher", valueDef() + .addValidator(new EnumValidator(DISPATCHERS, false)) + .setTransformer(FlowExecutor.DispatcherTransformer.INSTANCE) + ).setRequired(false), + tupleDef("filters", + HttpFiltersNodeDefinitionCreator.createFiltersList(true) + ).setRequired(false), + tupleDef("listeners", + new DynaKeyValueVaryListNodeDefinition("type", HttpClientListenerFactory.getInstance()) + .setTransformToPluginObject(true) + ).setRequired(false) + ); + } + + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerDispatcher.java Thu May 30 13:09:03 2019 +0200 @@ -0,0 +1,9 @@ +package com.passus.st.client; + +public interface FlowWorkerDispatcher { + + FlowWorker find(Event event, FlowWorker[] workers); + + int dispatch(Event event, FlowWorker[] workers); + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerFactory.java Thu May 30 13:09:03 2019 +0200 @@ -0,0 +1,26 @@ +package com.passus.st.client; + +import com.passus.commons.plugin.PluginFactory; +import com.passus.st.client.http.HttpClientWorker; +import com.passus.st.plugin.PluginConstants; + +/** + * @author Mirosław Hawrot + */ +public class FlowWorkerFactory extends PluginFactory<FlowWorker> { + + private static FlowWorkerFactory instance; + + public FlowWorkerFactory() { + super(PluginConstants.CATEGORY_FLOW_WORKER, FlowWorker.class); + } + + public static synchronized FlowWorkerFactory getInstance() { + if (instance == null) { + instance = new FlowWorkerFactory(); + } + + return instance; + } + +}
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Thu May 30 10:26:34 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Thu May 30 13:09:03 2019 +0200 @@ -1,16 +1,21 @@ package com.passus.st.client; import com.passus.commons.Assert; +import com.passus.commons.annotations.Plugin; import com.passus.st.emitter.ChannelContext; import com.passus.st.emitter.Emitter; import com.passus.st.emitter.SessionInfo; +import com.passus.st.plugin.PluginConstants; import java.util.*; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Semaphore; +@Plugin(name = ParallelFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER) public class ParallelFlowWorker extends FlowWorkerBased { + public static final String TYPE = "parallel"; + public static final int DEFAULT_MAX_SENT_REQUESTS = 10; private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/SourceNameAwareFlowWorkerDispatcher.java Thu May 30 13:09:03 2019 +0200 @@ -0,0 +1,36 @@ +package com.passus.st.client; + +import java.util.ArrayList; +import java.util.List; + +public class SourceNameAwareFlowWorkerDispatcher implements FlowWorkerDispatcher { + + private final List<String> sources = new ArrayList<>(4); + + @Override + public FlowWorker find(Event event, FlowWorker[] workers) { + int index = dispatch(event, workers); + return workers[index]; + } + + @Override + public int dispatch(Event event, FlowWorker[] workers) { + String sourceName = event.getSourceName(); + if (sourceName == null) { + throw new IllegalArgumentException("Event '" + event.getClass() + "' source name cannot be null."); + } + + int index = sources.indexOf(sourceName); + if (index == -1) { + if (sources.size() == workers.length) { + throw new IllegalArgumentException("Too low workers number."); + } + + sources.add(sourceName); + index = sources.size() - 1; + } + + return index; + } + +}
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Thu May 30 10:26:34 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Thu May 30 13:09:03 2019 +0200 @@ -1,14 +1,17 @@ package com.passus.st.client; import com.passus.commons.Assert; +import com.passus.commons.annotations.Plugin; import com.passus.st.emitter.Emitter; import com.passus.st.emitter.SessionInfo; +import com.passus.st.plugin.PluginConstants; import java.io.IOException; import java.util.concurrent.LinkedBlockingDeque; import static com.passus.st.client.FlowContext.contextStateToString; +@Plugin(name = SynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER) public class SynchFlowWorker extends FlowWorkerBased { public static final String TYPE = "synch";
--- a/stress-tester/src/main/java/com/passus/st/job/JobListener.java Thu May 30 10:26:34 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/job/JobListener.java Thu May 30 13:09:03 2019 +0200 @@ -3,24 +3,23 @@ import com.passus.commons.metric.MetricsCollection; /** - * * @author Mirosław Hawrot */ public interface JobListener { - public default void onJobRemoved(JobContext context) { - - } - - public default void onJobStatusChanged(JobContext context, JobStatus oldStatus) { - + default void onJobRemoved(JobContext context) { + } - public default void onJobError(JobContext context, Throwable cause) { - + default void onJobStatusChanged(JobContext context, JobStatus oldStatus) { + } - public default void onJobMetrics(JobContext context, MetricsCollection collection) { - + default void onJobError(JobContext context, Throwable cause) { + + } + + default void onJobMetrics(JobContext context, MetricsCollection collection) { + } }
--- a/stress-tester/src/main/java/com/passus/st/plugin/PluginConstants.java Thu May 30 10:26:34 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/plugin/PluginConstants.java Thu May 30 13:09:03 2019 +0200 @@ -25,6 +25,8 @@ public static final String CATEGORY_HTTP_CLIENT_WORKER = "ClientWorker"; + public static final String CATEGORY_FLOW_WORKER = "FlowWorker"; + public static final String CATEGORY_REPORTER_DESTINATION = "ReporterDestination"; public static final String CATEGORY_GENERATOR = "Generator";