changeset 1062:46ebffd7b305

Metrics improvements + bugfixes
author Devel 2
date Fri, 17 Apr 2020 11:14:07 +0200
parents b79ecce8df37
children 6fcaacd4fa38
files stress-tester/src/main/java/com/passus/st/client/AbstractFlowProcessorSupervisorWrapper.java stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java stress-tester/src/main/java/com/passus/st/client/FlowMetric.java stress-tester/src/main/java/com/passus/st/client/FlowWorker.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/client/PerNameMetricsContainer.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerMetric.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java stress-tester/src/main/java/com/passus/st/client/http/HttpMetric.java stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlMetric.java stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java stress-tester/src/main/java/com/passus/st/metric/MetricsBatchTimeWindow.java stress-tester/src/main/java/com/passus/st/metric/MetricsContainer.java stress-tester/src/main/java/com/passus/st/utils/NetExceptionsCategory.java stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java
diffstat 17 files changed, 208 insertions(+), 116 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AbstractFlowProcessorSupervisorWrapper.java	Fri Apr 17 11:13:20 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/AbstractFlowProcessorSupervisorWrapper.java	Fri Apr 17 11:14:07 2020 +0200
@@ -52,7 +52,7 @@
 
     @Override
     public boolean isCollectMetrics() {
-        return flowWorker.collectMetric;
+        return flowWorker.collectMetrics;
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java	Fri Apr 17 11:13:20 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java	Fri Apr 17 11:14:07 2020 +0200
@@ -8,7 +8,7 @@
 import com.passus.st.client.dns.DnsFlowHandler;
 import com.passus.st.client.http.HttpFlowHandler;
 import com.passus.st.client.netflow.NetflowFlowHandler;
-import com.passus.st.client.pgsql.PgSqlFlowHandlerNetflowFlowHandler;
+import com.passus.st.client.pgsql.PgSqlFlowHandler;
 
 import static com.passus.st.Protocols.*;
 
@@ -24,7 +24,7 @@
             case NETFLOW:
                 return new NetflowFlowHandler();
             case PGSQL:
-                return new PgSqlFlowHandlerNetflowFlowHandler();
+                return new PgSqlFlowHandler();
 
         }
 
@@ -44,7 +44,7 @@
         } else if (message instanceof Netflow) {
             return new NetflowFlowHandler();
         } else if (message instanceof PgSqlMessage) {
-            return new PgSqlFlowHandlerNetflowFlowHandler();
+            return new PgSqlFlowHandler();
         }
 
         throw new IllegalArgumentException("Not supported class '" + message.getClass() + "'.");
--- a/stress-tester/src/main/java/com/passus/st/client/FlowMetric.java	Fri Apr 17 11:13:20 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowMetric.java	Fri Apr 17 11:14:07 2020 +0200
@@ -7,7 +7,7 @@
 
 public class FlowMetric extends GenericMetric {
 
-    public static final String FLOW_METRIC = "flow";
+    public static final String FLOW_METRIC = "Flow";
 
     private final LongHistogramMetric requestSendingTimeHistogram = new LongHistogramMetricImpl(DEFAULT_HISTOGRAM_VALUES);
 
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java	Fri Apr 17 11:13:20 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java	Fri Apr 17 11:14:07 2020 +0200
@@ -31,7 +31,7 @@
 
     protected FlowHandlerFactory clientFactory = new FlowHandlerFactoryImpl();
 
-    protected boolean collectMetric;
+    protected boolean collectMetrics;
 
     protected FlowMetric metric;
 
@@ -155,10 +155,10 @@
         if (collectMetrics && metric == null) {
             metric = new FlowMetric();
             metric.activate();
-            collectMetric = true;
+            this.collectMetrics = true;
         } else if (!collectMetrics && metric != null) {
             metric.deactivate();
-            collectMetric = false;
+            this.collectMetrics = false;
             metric = null;
         }
     }
@@ -176,7 +176,7 @@
 
     @Override
     public void writeMetrics(MetricsContainer container) {
-        if (collectMetric) {
+        if (collectMetrics) {
             synchronized (metric) {
                 container.update(System.currentTimeMillis(), metric);
                 metric.reset();
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Fri Apr 17 11:13:20 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Fri Apr 17 11:14:07 2020 +0200
@@ -504,7 +504,7 @@
                 }
 
                 if (decoder.state() == DataDecoder.STATE_ERROR) {
-                    if (collectMetric) {
+                    if (collectMetrics) {
                         synchronized (metric) {
                             metric.incErrorNum();
                         }
@@ -517,7 +517,7 @@
                     decoder.clear(flowContext);
                     responseReceived0(flowContext, null);
                 } else if (decoder.state() == DataDecoder.STATE_FINISHED) {
-                    if (collectMetric) {
+                    if (collectMetrics) {
                         synchronized (metric) {
                             metric.incResponsesNum();
                             metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp());
@@ -542,7 +542,7 @@
                     responseReceived0(flowContext, resp);
                 }
             } catch (Exception e) {
-                if (collectMetric) {
+                if (collectMetrics) {
                     synchronized (metric) {
                         metric.incErrorNum();
                     }
@@ -581,7 +581,7 @@
         try {
             if (flowContext.isEventSent()) {
                 long now = timeGenerator.currentTimeMillis();
-                if (collectMetric) {
+                if (collectMetrics) {
                     synchronized (metric) {
                         metric.addRequestSendingTime(now - flowContext.sendStartTimestamp());
                     }
@@ -668,7 +668,7 @@
                     return;
                 }
 
-                if (collectMetric) {
+                if (collectMetrics) {
                     synchronized (metric) {
                         metric.incRequestsNum();
                         metric.addRequestSize(flowContext.buffer().readableBytes());
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/PerNameMetricsContainer.java	Fri Apr 17 11:14:07 2020 +0200
@@ -0,0 +1,46 @@
+package com.passus.st.client;
+
+import com.passus.commons.metric.Metric;
+import com.passus.st.metric.MetricsContainer;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PerNameMetricsContainer implements MetricsContainer {
+
+    private Map<String, Metric> metrics = new HashMap<>();
+
+    @Override
+    public int size() {
+        return metrics.size();
+    }
+
+    public Collection<Metric> getMetrics() {
+        return metrics.values();
+    }
+
+    @Override
+    public void clear() {
+        metrics.clear();
+    }
+
+    @Override
+    public boolean update(long time, Metric metric) {
+        Metric rangeMetric = metrics.get(metric.getName());
+        if (rangeMetric == null) {
+            synchronized (metric) {
+                rangeMetric = metric.copy();
+            }
+
+            metrics.put(metric.getName(), rangeMetric);
+        } else {
+            synchronized (metric) {
+                rangeMetric.update(metric);
+            }
+        }
+
+        return true;
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Fri Apr 17 11:13:20 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Fri Apr 17 11:14:07 2020 +0200
@@ -33,6 +33,8 @@
 
     private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>();
 
+    private final PerNameMetricsContainer deferredMetrics = new PerNameMetricsContainer();
+
     private long eventsQueueWaitTime = 100;
 
     private int loop = 0;
@@ -87,6 +89,15 @@
                 f.client.writeMetrics(container);
             }
         });
+
+        if (!deferredMetrics.isEmpty()) {
+            deferredMetrics.getMetrics().forEach(m -> {
+                container.update(m);
+                m.reset();
+            });
+
+            deferredMetrics.clear();
+        }
     }
 
     protected FlowContext register(SessionEvent sessionEvent) {
@@ -103,7 +114,7 @@
         flowContext.createLock();
         FlowHandler client = clientFactory.create(session.getProtocolId());
         client.init(flowContext);
-        client.setCollectMetrics(collectMetric);
+        client.setCollectMetrics(collectMetrics);
         flowContext.client(client);
         sessions.put(session, flowContext);
         return flowContext;
@@ -313,6 +324,10 @@
 
         @Override
         public void onDisconnected(FlowContext flowContext) {
+            if (collectMetrics) {
+                flowContext.client.writeMetrics(deferredMetrics);
+            }
+
             sessions.remove(flowContext.session);
         }
 
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerMetric.java	Fri Apr 17 11:13:20 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,82 +0,0 @@
-package com.passus.st.client.http;
-
-import com.passus.commons.metric.LongHistogramMetric;
-import com.passus.commons.metric.LongHistogramMetricImpl;
-import com.passus.commons.metric.Metric;
-import com.passus.st.client.GenericMetric;
-import org.apache.commons.lang3.mutable.MutableInt;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * @author Mirosław Hawrot
- */
-public class HttpClientWorkerMetric extends GenericMetric {
-
-    public static final String DEFAULT_NAME = "HTTP";
-
-    private final TreeMap<Integer, MutableInt> respStatuses = new TreeMap<>();
-
-    private final LongHistogramMetric responseTimeHistogram = new LongHistogramMetricImpl(DEFAULT_HISTOGRAM_VALUES);
-
-    private final LongHistogramMetric responseSizeHistogram = new LongHistogramMetricImpl(DEFAULT_HISTOGRAM_VALUES);
-
-    public HttpClientWorkerMetric() {
-        this(DEFAULT_NAME);
-    }
-
-    public HttpClientWorkerMetric(String name) {
-        super(name);
-        attrs.put("responseStatusCodes", respStatuses);
-        attrs.put("responseTimeDist", responseTimeHistogram);
-        attrs.put("responseSizeHistogram", responseSizeHistogram);
-
-    }
-
-    @Override
-    protected void doActivate() {
-        super.doActivate();
-        responseTimeHistogram.activate();
-        responseSizeHistogram.activate();
-    }
-
-    @Override
-    protected void doDeactivate() {
-        super.doDeactivate();
-        responseTimeHistogram.deactivate();
-        responseSizeHistogram.deactivate();
-    }
-
-    public Map<Integer, MutableInt> getStatusCodes() {
-        return respStatuses;
-    }
-
-    public void addResponseStatusCode(Integer statusCode) {
-        Metric.incrementCountMap(statusCode, respStatuses);
-    }
-
-    public void addResponseTime(long time) {
-        responseTimeHistogram.update(time);
-    }
-
-    public void addResponseSize(long size) {
-        responseSizeHistogram.update(size);
-    }
-
-    @Override
-    public void reset() {
-        respStatuses.clear();
-        responseTimeHistogram.reset();
-        responseSizeHistogram.reset();
-    }
-
-    @Override
-    public void update(Metric metric) {
-        HttpClientWorkerMetric clientMetric = (HttpClientWorkerMetric) metric;
-        Metric.updateCountMap(respStatuses, clientMetric.respStatuses);
-        responseTimeHistogram.update(clientMetric.responseTimeHistogram);
-        responseSizeHistogram.update(clientMetric.responseSizeHistogram);
-    }
-
-}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java	Fri Apr 17 11:13:20 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java	Fri Apr 17 11:14:07 2020 +0200
@@ -22,7 +22,7 @@
 
     boolean collectMetrics = false;
 
-    HttpClientWorkerMetric metric;
+    HttpMetric metric;
 
     HttpScopes scopes;
 
@@ -73,7 +73,7 @@
     public void setCollectMetrics(boolean collectMetrics) {
         this.collectMetrics = collectMetrics;
         if (collectMetrics) {
-            metric = new HttpClientWorkerMetric();
+            metric = new HttpMetric();
             synchronized (metric) {
                 metric.activate();
             }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpMetric.java	Fri Apr 17 11:14:07 2020 +0200
@@ -0,0 +1,82 @@
+package com.passus.st.client.http;
+
+import com.passus.commons.metric.LongHistogramMetric;
+import com.passus.commons.metric.LongHistogramMetricImpl;
+import com.passus.commons.metric.Metric;
+import com.passus.st.client.GenericMetric;
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * @author Mirosław Hawrot
+ */
+public class HttpMetric extends GenericMetric {
+
+    public static final String DEFAULT_NAME = "HTTP";
+
+    private final TreeMap<Integer, MutableInt> respStatuses = new TreeMap<>();
+
+    private final LongHistogramMetric responseTimeHistogram = new LongHistogramMetricImpl(DEFAULT_HISTOGRAM_VALUES);
+
+    private final LongHistogramMetric responseSizeHistogram = new LongHistogramMetricImpl(DEFAULT_HISTOGRAM_VALUES);
+
+    public HttpMetric() {
+        this(DEFAULT_NAME);
+    }
+
+    public HttpMetric(String name) {
+        super(name);
+        attrs.put("responseStatusCodes", respStatuses);
+        attrs.put("responseTimeDist", responseTimeHistogram);
+        attrs.put("responseSizeHistogram", responseSizeHistogram);
+
+    }
+
+    @Override
+    protected void doActivate() {
+        super.doActivate();
+        responseTimeHistogram.activate();
+        responseSizeHistogram.activate();
+    }
+
+    @Override
+    protected void doDeactivate() {
+        super.doDeactivate();
+        responseTimeHistogram.deactivate();
+        responseSizeHistogram.deactivate();
+    }
+
+    public Map<Integer, MutableInt> getStatusCodes() {
+        return respStatuses;
+    }
+
+    public void addResponseStatusCode(Integer statusCode) {
+        Metric.incrementCountMap(statusCode, respStatuses);
+    }
+
+    public void addResponseTime(long time) {
+        responseTimeHistogram.update(time);
+    }
+
+    public void addResponseSize(long size) {
+        responseSizeHistogram.update(size);
+    }
+
+    @Override
+    public void reset() {
+        respStatuses.clear();
+        responseTimeHistogram.reset();
+        responseSizeHistogram.reset();
+    }
+
+    @Override
+    public void update(Metric metric) {
+        HttpMetric clientMetric = (HttpMetric) metric;
+        Metric.updateCountMap(respStatuses, clientMetric.respStatuses);
+        responseTimeHistogram.update(clientMetric.responseTimeHistogram);
+        responseSizeHistogram.update(clientMetric.responseSizeHistogram);
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlMetric.java	Fri Apr 17 11:13:20 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlMetric.java	Fri Apr 17 11:14:07 2020 +0200
@@ -1,7 +1,11 @@
 package com.passus.st.client.pgsql;
 
+import com.passus.commons.metric.Metric;
 import com.passus.st.client.DbMetric;
 import com.passus.st.utils.DbUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import java.util.HashMap;
 
 public class PgSqlMetric extends DbMetric {
 
@@ -9,12 +13,32 @@
 
     private final StringBuilder sb = new StringBuilder();
 
+    private final HashMap<String, MutableInt> errorCodesCount = new HashMap<>();
+
+    public PgSqlMetric() {
+        this(DEFAULT_NAME);
+    }
+
     public PgSqlMetric(String name) {
         super(name);
+        attrs.put("errorCodes", errorCodesCount);
     }
 
-    public PgSqlMetric() {
-        super(DEFAULT_NAME);
+    public void addErrorCode(String status) {
+        Metric.incrementCountMap(status, errorCodesCount);
+    }
+
+    @Override
+    public void update(Metric metric) {
+        super.update(metric);
+        PgSqlMetric dbMetric = (PgSqlMetric) metric;
+        Metric.updateCountMap(errorCodesCount, dbMetric.errorCodesCount);
+    }
+
+    @Override
+    public void reset() {
+        super.reset();
+        errorCodesCount.clear();
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java	Fri Apr 17 11:13:20 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java	Fri Apr 17 11:14:07 2020 +0200
@@ -19,7 +19,7 @@
  */
 public class EmitterMetric implements Metric {
 
-    public static final String DEFAULT_NAME = "emitter";
+    public static final String DEFAULT_NAME = "Emitter";
 
     private final Map<String, Serializable> attrs = new LinkedHashMap<>();
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Fri Apr 17 11:13:20 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Fri Apr 17 11:14:07 2020 +0200
@@ -8,7 +8,6 @@
 import com.passus.st.metric.MetricsContainer;
 
 import java.io.IOException;
-import java.net.ConnectException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
@@ -123,8 +122,7 @@
                     logger.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress());
                 }
 
-                throw new ConnectException("Connection timed out.");
-
+                throw new IOException("Connection timed out (" + connectionTimeout + "ms).");
             }
         } catch (Exception e) {
             doCatchException(key, e);
--- a/stress-tester/src/main/java/com/passus/st/metric/MetricsBatchTimeWindow.java	Fri Apr 17 11:13:20 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/metric/MetricsBatchTimeWindow.java	Fri Apr 17 11:14:07 2020 +0200
@@ -1,11 +1,11 @@
 package com.passus.st.metric;
 
 import com.passus.commons.metric.Metric;
+
 import java.util.ArrayDeque;
 import java.util.List;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 class MetricsBatchTimeWindow implements MetricsContainer {
@@ -34,6 +34,12 @@
         return period;
     }
 
+    @Override
+    public int size() {
+        return batch.size();
+    }
+
+    @Override
     public void clear() {
         batch.clear();
     }
@@ -186,8 +192,5 @@
         return results;
     }
 
-    public int size() {
-        return batch.size();
-    }
 
 }
--- a/stress-tester/src/main/java/com/passus/st/metric/MetricsContainer.java	Fri Apr 17 11:13:20 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/metric/MetricsContainer.java	Fri Apr 17 11:14:07 2020 +0200
@@ -11,6 +11,14 @@
         return update(System.currentTimeMillis(), metric);
     }
 
+    default boolean isEmpty() {
+        return size() == 0;
+    }
+
+    void clear();
+
+    int size();
+
     boolean update(long time, Metric metric);
 
 }
--- a/stress-tester/src/main/java/com/passus/st/utils/NetExceptionsCategory.java	Fri Apr 17 11:13:20 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/utils/NetExceptionsCategory.java	Fri Apr 17 11:14:07 2020 +0200
@@ -6,20 +6,19 @@
 import java.net.SocketException;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public class NetExceptionsCategory {
 
     public static final String CONNECTION_REFUSED = "connection.refused";
-    public static final String CONNECTION_TIMEDOUT = "connection.timedout";
+    public static final String CONNECTION_TIMEOUT = "connection.timeout";
     public static final String CONNECTION_NO_ROUTE_TO_HOST = "connection.no_route_to_host";
-    public static final String CONNECTION_UNKNWON = "connection.unknwon";
+    public static final String CONNECTION_UNKNOWN = "connection.unknown";
 
     public static final String BIND_MAPPER_SESSION_INVALID = "bind.mapper_session_invalid";
     public static final String BIND_NETWORK_UNREACHABLE = "bind.network_unreachable";
     public static final String BIND_ADDRESS_ALREADY_IN_USE = "bind.address_already_in_use";
-    public static final String BIND_UNKNWON = "bind.unknown";
+    public static final String BIND_UNKNOWN = "bind.unknown";
 
     public static final String UNKNOWN_ERROR = "unknown_error";
 
@@ -34,14 +33,14 @@
             if (msg.contains("Address already in use")) {
                 return BIND_ADDRESS_ALREADY_IN_USE;
             } else {
-                return BIND_UNKNWON;
+                return BIND_UNKNOWN;
             }
         } else if (cause instanceof ConnectException) {
             String msg = cause.getMessage();
             if (msg.contains("Connection refused")) {
                 return CONNECTION_REFUSED;
             } else if (msg.contains("Connection timed out")) {
-                return CONNECTION_TIMEDOUT;
+                return CONNECTION_TIMEOUT;
             }
         } else if (cause instanceof SocketException) {
             String msg = cause.getMessage();
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Fri Apr 17 11:13:20 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Fri Apr 17 11:14:07 2020 +0200
@@ -2,7 +2,6 @@
 
 import com.passus.data.PooledByteBuffAllocator;
 import com.passus.net.PortRangeSet;
-import com.passus.st.Log4jConfigurationFactory;
 import com.passus.st.Protocols;
 import com.passus.st.client.ArrayListEventHandler;
 import com.passus.st.client.Event;