changeset 959:773f0f4ff33b

Refactorization in progress
author Devel 2
date Thu, 30 May 2019 13:09:03 +0200
parents c15144d4da64
children b1a58efd0bf7
files stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerDispatcher.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerFactory.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/SourceNameAwareFlowWorkerDispatcher.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/main/java/com/passus/st/job/JobListener.java stress-tester/src/main/java/com/passus/st/plugin/PluginConstants.java
diffstat 9 files changed, 459 insertions(+), 11 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java	Thu May 30 10:26:34 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java	Thu May 30 13:09:03 2019 +0200
@@ -1,10 +1,12 @@
 package com.passus.st.client;
 
 import com.passus.commons.Assert;
+import com.passus.commons.annotations.Plugin;
 import com.passus.net.http.HttpRequest;
 import com.passus.net.http.HttpResponse;
 import com.passus.st.emitter.Emitter;
 import com.passus.st.emitter.SessionInfo;
+import com.passus.st.plugin.PluginConstants;
 
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -12,6 +14,7 @@
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+@Plugin(name = AsynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER)
 public class AsynchFlowWorker extends FlowWorkerBased {
 
     public static final String TYPE = "asynch";
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java	Thu May 30 13:09:03 2019 +0200
@@ -0,0 +1,365 @@
+package com.passus.st.client;
+
+import com.passus.commons.Assert;
+import com.passus.commons.service.Service;
+import com.passus.commons.service.ServiceException;
+import com.passus.config.Configurable;
+import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
+import com.passus.config.ValueTransformer;
+import com.passus.config.schema.DynaKeyValueVaryListNodeDefinition;
+import com.passus.config.schema.NodeDefinition;
+import com.passus.config.schema.NodeDefinitionCreator;
+import com.passus.config.validation.EnumValidator;
+import com.passus.config.validation.Errors;
+import com.passus.config.validation.LongValidator;
+import com.passus.st.client.http.HttpClient;
+import com.passus.st.client.http.HttpClientListenerFactory;
+import com.passus.st.client.http.filter.HttpFiltersNodeDefinitionCreator;
+import com.passus.st.emitter.Emitter;
+import com.passus.st.metric.MetricSource;
+import com.passus.st.metric.MetricsContainer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.*;
+
+import static com.passus.config.schema.ConfigurationSchemaBuilder.*;
+
+public class FlowExecutor implements EventHandler, MetricSource, Service, Configurable {
+
+    private static final int DEFAULT_WORKERS_NUM = 1;
+
+    private static final boolean DEFAULT_CONNECT_PARTIAL_SESSION = false;
+
+    private static final boolean DEFAULT_COLLECT_METRICS = false;
+
+    private static final Logger LOGGER = LogManager.getLogger(HttpClient.class);
+
+    private Emitter emitter;
+
+    private FlowWorker[] workers;
+
+    private long connCloseTimeout = 5_000;
+
+    private ClientListener listener;
+
+    private FlowFilterChain filterChain = new FlowFilterChain();
+
+    private volatile boolean started = false;
+
+    //private float sleepFactor = FlowWorker.SLEEP_FACTOR_NO_SLEEP;
+
+    private String wokerType = "synch";
+
+    private int workersNum = DEFAULT_WORKERS_NUM;
+
+    private boolean collectMetrics = DEFAULT_COLLECT_METRICS;
+
+    private boolean connectPartialSession = DEFAULT_CONNECT_PARTIAL_SESSION;
+
+    private FlowWorkerDispatcher dispatcher;
+
+    public Emitter getEmitter() {
+        return emitter;
+    }
+
+    public void setEmitter(Emitter emitter) {
+        this.emitter = emitter;
+    }
+
+    public void setListener(ClientListener listener) {
+        this.listener = listener;
+    }
+
+    public ClientListener getListener() {
+        return listener;
+    }
+
+    public List<FlowFilter> getFilters() {
+        return filterChain.getFilters();
+    }
+
+    public void addFilter(FlowFilter filter) {
+        Assert.notNull(filter, "filter");
+        filterChain.addFilter(filter);
+    }
+
+    public void setFilters(Collection<FlowFilter> filters) {
+        Assert.notContainsNull(filters, "filters");
+        filterChain.clear();
+        filters.forEach((filter) -> filterChain.addFilter(filter));
+    }
+
+    public String getWokerType() {
+        return wokerType;
+    }
+
+    public void setWokerType(String wokerType) {
+        Assert.notNull(wokerType, "wokerType");
+
+        if (!FlowWorkerFactory.getInstance().containsName(wokerType)) {
+            throw new IllegalArgumentException("Unknwon worker type '" + wokerType + "'.");
+        }
+
+        this.wokerType = wokerType;
+    }
+
+    public int getWorkersNum() {
+        return workersNum;
+    }
+
+    public void setWorkersNum(int workersNum) {
+        Assert.greaterThanZero(workersNum, "workersNum");
+        this.workersNum = workersNum;
+    }
+
+/*    public float getSleepFactor() {
+        return sleepFactor;
+    }
+
+    public void setSleepFactor(float sleepFactor) {
+        this.sleepFactor = sleepFactor;
+    }*/
+
+    public FlowWorkerDispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setDispatcher(FlowWorkerDispatcher dispatcher) {
+        Assert.notNull(dispatcher, "dispatcher");
+        this.dispatcher = dispatcher;
+    }
+
+    public int getActiveConnections() {
+        int count = 0;
+        for (FlowWorker worker : workers) {
+            count += worker.activeConnections();
+        }
+        return count;
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        this.collectMetrics = collectMetrics;
+        if (workers != null) {
+            for (FlowWorker worker : workers) {
+                worker.setCollectMetrics(collectMetrics);
+            }
+        }
+    }
+
+    public boolean isConnectPartialSession() {
+        return connectPartialSession;
+    }
+
+    public void setConnectPartialSession(boolean connectPartialSession) {
+        this.connectPartialSession = connectPartialSession;
+        if (workers != null) {
+            for (FlowWorker worker : workers) {
+                worker.setConnectPartialSession(collectMetrics);
+            }
+        }
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+        if (collectMetrics && workers != null) {
+            for (FlowWorker worker : workers) {
+                worker.writeMetrics(container);
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void configure(Configuration config, ConfigurationContext context) {
+        setFilters((List<FlowFilter>) config.get("filters", Collections.EMPTY_LIST));
+        //setListeners((List<ClientListener>) config.get("listeners", Collections.EMPTY_LIST));
+        setWorkersNum(config.getInteger("workers", DEFAULT_WORKERS_NUM));
+        connectPartialSession = config.getBoolean("connectPartialSession", DEFAULT_CONNECT_PARTIAL_SESSION);
+        collectMetrics = config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS);
+        dispatcher = (FlowWorkerDispatcher) config.get("dispatcher", null);
+    }
+
+    @Override
+    public boolean isStarted() {
+        return started;
+    }
+
+    public boolean isWorking() {
+        if (started) {
+            for (FlowWorker worker : workers) {
+                if (worker.isWorking()) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
+    public void join() throws InterruptedException {
+        for (FlowWorker worker : workers) {
+            worker.join();
+        }
+    }
+
+    @Override
+    public void start() {
+        if (started) {
+            return;
+        }
+
+        if (emitter == null) {
+            throw new ServiceException("Emitter not set.");
+        }
+
+        FlowWorkerFactory factory = FlowWorkerFactory.getInstance();
+        String threadName = getClass().getSimpleName() + "-worker-";
+        workers = new FlowWorker[workersNum];
+        Class[] parameterTypes = {Emitter.class, String.class, Integer.TYPE};
+
+        for (int i = 0; i < workersNum; i++) {
+            Object[] initArgs = {emitter, threadName, i};
+            FlowWorker worker = factory.getInstanceByName(wokerType, parameterTypes, initArgs);
+            if (worker == null) {
+                throw new ServiceException("Unable to create instance of worker '" + wokerType + "'");
+            }
+
+            worker.setListener(listener);
+            worker.setFilterChain(filterChain.instanceForWorker(i));
+
+//            if (worker instanceof HttpFlowBasedClientWorker) {
+//                ((HttpFlowBasedClientWorker) worker).setSleepFactor(sleepFactor);
+//            }
+
+            worker.setCollectMetrics(collectMetrics);
+            worker.setConnectPartialSession(connectPartialSession);
+            workers[i] = worker;
+            worker.start();
+        }
+
+        started = true;
+    }
+
+    @Override
+    public void stop() {
+        if (!started) {
+            return;
+        }
+
+        for (FlowWorker worker : workers) {
+            worker.close();
+            worker.interrupt();
+
+            try {
+                worker.join();
+            } catch (Exception ignore) {
+            }
+        }
+
+        workers = null;
+        started = false;
+    }
+
+    public void closeAllConnections() {
+        for (FlowWorker worker : workers) {
+            worker.close();
+        }
+    }
+
+    @Override
+    public void handle(Event event) {
+        if (!started) {
+            throw new IllegalArgumentException("Service not started.");
+        }
+
+        if (LOGGER.isTraceEnabled()) {
+            LOGGER.trace("Event: {}", event);
+        }
+
+        try {
+            if (dispatcher == null) {
+                for (FlowWorker worker : workers) {
+                    worker.handle(event);
+                }
+            } else {
+                int index = dispatcher.dispatch(event, workers);
+                workers[index].handle(event);
+            }
+        } catch (Exception e) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug(e.getMessage(), e);
+            }
+        }
+
+    }
+
+    private static class DispatcherTransformer implements ValueTransformer {
+
+        private static final DispatcherTransformer INSTANCE = new DispatcherTransformer();
+
+        @Override
+        public FlowWorkerDispatcher transform(Object obj, Errors errors, ConfigurationContext context) {
+            if (obj == null) {
+                return null;
+            }
+            if (obj instanceof String) {
+                String s = (String) obj;
+                switch (s) {
+                    case "source":
+                        return new SourceNameAwareFlowWorkerDispatcher();
+                    default:
+                        errors.reject("Unknown dispatcher type: " + s);
+                        return null;
+                }
+            }
+
+            errors.reject("Invalid type.");
+            return null;
+        }
+
+        @Override
+        public Object reverseTransform(Object obj, Errors errors, ConfigurationContext context) {
+            throw new UnsupportedOperationException("Not supported yet.");
+        }
+
+    }
+
+    public static class HttpClientNodeDefCreator implements NodeDefinitionCreator {
+
+        private static final Set<String> DISPATCHERS = new HashSet<>(Arrays.asList(
+                "source", "parallel"
+        ));
+
+        @Override
+        public NodeDefinition create() {
+            return mapDef(
+                    tupleDef("connectPartialSession", valueDefBool()).setRequired(false),
+                    tupleDef("collectMetrics", valueDefBool()).setRequired(false),
+                    tupleDef("workers", valueDefInteger()
+                            .addValidator(LongValidator.GREATER_ZERO)
+                    ).setRequired(false),
+                    tupleDef("dispatcher", valueDef()
+                            .addValidator(new EnumValidator(DISPATCHERS, false))
+                            .setTransformer(FlowExecutor.DispatcherTransformer.INSTANCE)
+                    ).setRequired(false),
+                    tupleDef("filters",
+                            HttpFiltersNodeDefinitionCreator.createFiltersList(true)
+                    ).setRequired(false),
+                    tupleDef("listeners",
+                            new DynaKeyValueVaryListNodeDefinition("type", HttpClientListenerFactory.getInstance())
+                                    .setTransformToPluginObject(true)
+                    ).setRequired(false)
+            );
+        }
+
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerDispatcher.java	Thu May 30 13:09:03 2019 +0200
@@ -0,0 +1,9 @@
+package com.passus.st.client;
+
+public interface FlowWorkerDispatcher {
+
+    FlowWorker find(Event event, FlowWorker[] workers);
+
+    int dispatch(Event event, FlowWorker[] workers);
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerFactory.java	Thu May 30 13:09:03 2019 +0200
@@ -0,0 +1,26 @@
+package com.passus.st.client;
+
+import com.passus.commons.plugin.PluginFactory;
+import com.passus.st.client.http.HttpClientWorker;
+import com.passus.st.plugin.PluginConstants;
+
+/**
+ * @author Mirosław Hawrot
+ */
+public class FlowWorkerFactory extends PluginFactory<FlowWorker> {
+
+    private static FlowWorkerFactory instance;
+
+    public FlowWorkerFactory() {
+        super(PluginConstants.CATEGORY_FLOW_WORKER, FlowWorker.class);
+    }
+
+    public static synchronized FlowWorkerFactory getInstance() {
+        if (instance == null) {
+            instance = new FlowWorkerFactory();
+        }
+
+        return instance;
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Thu May 30 10:26:34 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Thu May 30 13:09:03 2019 +0200
@@ -1,16 +1,21 @@
 package com.passus.st.client;
 
 import com.passus.commons.Assert;
+import com.passus.commons.annotations.Plugin;
 import com.passus.st.emitter.ChannelContext;
 import com.passus.st.emitter.Emitter;
 import com.passus.st.emitter.SessionInfo;
+import com.passus.st.plugin.PluginConstants;
 
 import java.util.*;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.Semaphore;
 
+@Plugin(name = ParallelFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER)
 public class ParallelFlowWorker extends FlowWorkerBased {
 
+    public static final String TYPE = "parallel";
+
     public static final int DEFAULT_MAX_SENT_REQUESTS = 10;
 
     private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/SourceNameAwareFlowWorkerDispatcher.java	Thu May 30 13:09:03 2019 +0200
@@ -0,0 +1,36 @@
+package com.passus.st.client;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SourceNameAwareFlowWorkerDispatcher implements FlowWorkerDispatcher {
+
+    private final List<String> sources = new ArrayList<>(4);
+
+    @Override
+    public FlowWorker find(Event event, FlowWorker[] workers) {
+        int index = dispatch(event, workers);
+        return workers[index];
+    }
+
+    @Override
+    public int dispatch(Event event, FlowWorker[] workers) {
+        String sourceName = event.getSourceName();
+        if (sourceName == null) {
+            throw new IllegalArgumentException("Event '" + event.getClass() + "' source name cannot be null.");
+        }
+
+        int index = sources.indexOf(sourceName);
+        if (index == -1) {
+            if (sources.size() == workers.length) {
+                throw new IllegalArgumentException("Too low workers number.");
+            }
+
+            sources.add(sourceName);
+            index = sources.size() - 1;
+        }
+
+        return index;
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Thu May 30 10:26:34 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Thu May 30 13:09:03 2019 +0200
@@ -1,14 +1,17 @@
 package com.passus.st.client;
 
 import com.passus.commons.Assert;
+import com.passus.commons.annotations.Plugin;
 import com.passus.st.emitter.Emitter;
 import com.passus.st.emitter.SessionInfo;
+import com.passus.st.plugin.PluginConstants;
 
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import static com.passus.st.client.FlowContext.contextStateToString;
 
+@Plugin(name = SynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER)
 public class SynchFlowWorker extends FlowWorkerBased {
 
     public static final String TYPE = "synch";
--- a/stress-tester/src/main/java/com/passus/st/job/JobListener.java	Thu May 30 10:26:34 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/job/JobListener.java	Thu May 30 13:09:03 2019 +0200
@@ -3,24 +3,23 @@
 import com.passus.commons.metric.MetricsCollection;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public interface JobListener {
 
-    public default void onJobRemoved(JobContext context) {
-        
-    }
-    
-    public default void onJobStatusChanged(JobContext context, JobStatus oldStatus) {
-        
+    default void onJobRemoved(JobContext context) {
+
     }
 
-    public default void onJobError(JobContext context, Throwable cause) {
-        
+    default void onJobStatusChanged(JobContext context, JobStatus oldStatus) {
+
     }
 
-    public default void onJobMetrics(JobContext context, MetricsCollection collection) {
-        
+    default void onJobError(JobContext context, Throwable cause) {
+
+    }
+
+    default void onJobMetrics(JobContext context, MetricsCollection collection) {
+
     }
 }
--- a/stress-tester/src/main/java/com/passus/st/plugin/PluginConstants.java	Thu May 30 10:26:34 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/plugin/PluginConstants.java	Thu May 30 13:09:03 2019 +0200
@@ -25,6 +25,8 @@
 
     public static final String CATEGORY_HTTP_CLIENT_WORKER = "ClientWorker";
 
+    public static final String CATEGORY_FLOW_WORKER = "FlowWorker";
+
     public static final String CATEGORY_REPORTER_DESTINATION = "ReporterDestination";
 
     public static final String CATEGORY_GENERATOR = "Generator";