changeset 1076:992057a5cc68

Metrics bugfixes
author Devel 2
date Wed, 29 Apr 2020 10:03:46 +0200
parents c3bcec472408
children 2211bb11ab38
files stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java stress-tester/src/main/java/com/passus/st/client/http/ReporterFileDestination.java stress-tester/src/main/java/com/passus/st/client/netflow/NetflowMetric.java stress-tester/src/main/java/com/passus/st/emitter/Emitter.java stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java stress-tester/src/main/java/com/passus/st/metric/MetricSource.java stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSourceMetric.java
diffstat 14 files changed, 123 insertions(+), 47 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java	Tue Apr 28 15:20:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java	Wed Apr 29 10:03:46 2020 +0200
@@ -8,7 +8,7 @@
     private final FlowHandlerDataEncoder<R> encoder;
     private final FlowHandlerDataDecoder<S> decoder;
 
-    protected boolean collectMetrics = false;
+    protected boolean collectMetrics = DEFAULT_COLLECT_METRICS;
 
     protected T metric;
 
--- a/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java	Tue Apr 28 15:20:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java	Wed Apr 29 10:03:46 2020 +0200
@@ -14,7 +14,7 @@
 
     TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
 
-    boolean collectMetrics = false;
+    boolean collectMetrics = DEFAULT_COLLECT_METRICS;
 
     DnsMetric metric;
 
--- a/stress-tester/src/main/java/com/passus/st/client/http/ReporterFileDestination.java	Tue Apr 28 15:20:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/ReporterFileDestination.java	Wed Apr 29 10:03:46 2020 +0200
@@ -240,7 +240,7 @@
             builder.append(";;;;;");
         }
 
-        metric = collection.getMetric("pcapSource");
+        metric = collection.getMetric("PcapSource");
         if (metric != null) {
             Map<String, Serializable> fields = metric.getAttributesValue();
             addValue(builder, fields.get("frames"));
--- a/stress-tester/src/main/java/com/passus/st/client/netflow/NetflowMetric.java	Tue Apr 28 15:20:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/netflow/NetflowMetric.java	Wed Apr 29 10:03:46 2020 +0200
@@ -21,7 +21,7 @@
     public NetflowMetric(String name) {
         super(name);
         attrs.put("v5flows", v5flows);
-        attrs.put("v5flows", v5flows);
+        attrs.put("v9flows", v9flows);
         attrs.put("v10flows", v10flows);
     }
 
@@ -38,7 +38,7 @@
                     break;
                 case NetflowUtils.VERSION_10:
                     Netflow10 netflow10 = (Netflow10) netflow;
-                    v9flows += netflow10.getFlowSets().size();
+                    v10flows += netflow10.getFlowSets().size();
                     break;
             }
         } catch (Exception ignore) {
@@ -49,9 +49,9 @@
     @Override
     public void update(Metric metric) {
         NetflowMetric netflowMetric = (NetflowMetric) metric;
-        netflowMetric.v5flows += v5flows;
-        netflowMetric.v9flows += v9flows;
-        netflowMetric.v10flows += v10flows;
+        v5flows += netflowMetric.v5flows;
+        v9flows += netflowMetric.v9flows;
+        v10flows += netflowMetric.v10flows;
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/Emitter.java	Tue Apr 28 15:20:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/Emitter.java	Wed Apr 29 10:03:46 2020 +0200
@@ -3,42 +3,41 @@
 import com.passus.commons.service.Service;
 import com.passus.config.Configurable;
 import com.passus.config.Configuration;
-import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef;
-import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef;
-
 import com.passus.config.ConfigurationContext;
 import com.passus.config.schema.MapNodeDefinition;
 import com.passus.config.schema.NodeDefinitionCreator;
 import com.passus.st.metric.MetricSource;
+
 import java.io.IOException;
 
+import static com.passus.config.schema.ConfigurationSchemaBuilder.*;
+
 /**
- *
  * @author Mirosław Hawrot
  */
 public interface Emitter extends Service, MetricSource, Configurable {
 
-    public static final SessionMapper DEFAULT_SESSION_MAPPER = new PassThroughSessionMapper();
-
-    public static final boolean DEFAULT_COLLECT_METRICS = false;
+    SessionMapper DEFAULT_SESSION_MAPPER = new PassThroughSessionMapper();
 
-    public void setSessionMapper(SessionMapper mapper);
+    void setSessionMapper(SessionMapper mapper);
 
-    public SessionMapper getSessionMapper();
+    SessionMapper getSessionMapper();
 
-    public void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException;
+    void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException;
 
     @Override
-    public default void configure(Configuration config, ConfigurationContext context) {
+    default void configure(Configuration config, ConfigurationContext context) {
         setSessionMapper((SessionMapper) config.get("sessionMapper"));
+        setCollectMetrics(config.get("collectMetrics", DEFAULT_COLLECT_METRICS));
     }
 
-    public static abstract class EmitterNodeDefCreator implements NodeDefinitionCreator {
+    abstract class EmitterNodeDefCreator implements NodeDefinitionCreator {
 
         @Override
         public MapNodeDefinition create() {
             return mapDef(
-                    tupleDef("sessionMapper", SessionMapperNodeDefinitionCreator.createDef()).setRequired(false)
+                    tupleDef("sessionMapper", SessionMapperNodeDefinitionCreator.createDef()).setRequired(false),
+                    tupleDef("collectMetrics", valueDefBool()).setRequired(false)
             );
         }
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java	Tue Apr 28 15:20:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java	Wed Apr 29 10:03:46 2020 +0200
@@ -4,17 +4,13 @@
 import com.passus.commons.metric.Metric;
 import com.passus.net.SocketAddress;
 import com.passus.st.utils.NetExceptionsCategory;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.mutable.MutableLong;
 
+import java.io.Serializable;
+import java.util.*;
+
 /**
- *
  * @author Mirosław Hawrot
  */
 public class EmitterMetric implements Metric {
@@ -29,6 +25,8 @@
 
     private final MutableLong receivedBytes = new MutableLong(0);
 
+    private final MutableLong dropped = new MutableLong(0);
+
     private final MutableInt establishedConnections = new MutableInt(0);
 
     private final MutableInt closedConnections = new MutableInt(0);
@@ -59,6 +57,7 @@
         attrs.put("bindErrors", bindErrors);
         attrs.put("remoteSocketConnections", (Serializable) remoteSocketConnections);
         attrs.put("errors", (Serializable) errors);
+        attrs.put("dropped", dropped);
         //attrs.put("bindedSockets", (Serializable) bindedSockets);
     }
 
@@ -83,6 +82,14 @@
         return receivedBytes.longValue();
     }
 
+    public long dropped() {
+        return dropped.longValue();
+    }
+
+    public void incDropped() {
+        dropped.increment();
+    }
+
     public void incConnectionsErrors() {
         connectionsErrors.increment();
     }
@@ -135,7 +142,7 @@
             count.increment();
         }
     }
-    
+
     @Override
     public boolean isActive() {
         return active;
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java	Tue Apr 28 15:20:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java	Wed Apr 29 10:03:46 2020 +0200
@@ -135,7 +135,6 @@
         Emitter.super.configure(config, context);
         setMaxThreads(config.getInteger("threads", DEFAULT_NUM_THREADS));
         setConnectionTimeout(config.getLong("connectionTimeout", DEFAULT_CONNECTION_TIMEOUT));
-        setCollectMetrics(config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS));
     }
 
     @Override
@@ -220,10 +219,6 @@
                     valueDef().setTransformer(PeriodValueTransformer.INSTANCE)
             ).setRequired(false);
 
-            def.add("collectMetrics",
-                    valueDefBool()
-            ).setRequired(false);
-
             return def;
         }
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java	Tue Apr 28 15:20:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java	Wed Apr 29 10:03:46 2020 +0200
@@ -8,6 +8,8 @@
 import com.passus.net.session.Session;
 import com.passus.st.emitter.*;
 import com.passus.st.metric.MetricsContainer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 
@@ -17,6 +19,8 @@
 
 public abstract class UnidirectionalRawPacketEmitter<E> implements Emitter {
 
+    protected final Logger logger = LogManager.getLogger(getClass());
+
     private static final int DEFAULT_THREADS = 4;
     private static final int DEFAULT_MTU = 1500;
     private static final boolean DEFAULT_COLLECT_METRICS = true;
@@ -34,10 +38,18 @@
 
     private SessionMapper sessionMapper = Emitter.DEFAULT_SESSION_MAPPER;
 
-    private boolean collectMetrics = false;
+    private boolean collectMetrics = DEFAULT_COLLECT_METRICS;
 
     private int mtu = DEFAULT_MTU;
 
+    private EmitterMetric metric;
+
+    protected final boolean trace;
+
+    public UnidirectionalRawPacketEmitter() {
+        this.trace = logger.isTraceEnabled();
+    }
+
     @Override
     public void setSessionMapper(SessionMapper sessionMapper) {
         this.sessionMapper = sessionMapper;
@@ -99,7 +111,11 @@
 
     @Override
     public void writeMetrics(MetricsContainer container) {
-
+        if (collectMetrics) {
+            for (UnidirectionalRawPacketWorker<E> worker : workers) {
+                worker.writeMetrics(container);
+            }
+        }
     }
 
     @Override
@@ -132,6 +148,7 @@
             worker.setPortPool(portPool);
             worker.setMacResolver(macResolver);
             worker.setSessionMapper(sessionMapper);
+            worker.setCollectMetrics(collectMetrics);
             worker.setMtu(mtu);
             worker.start();
             workers[i] = worker;
@@ -165,7 +182,17 @@
     @Override
     public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException {
         if (sessionInfo.getTransport() != Session.PROTOCOL_UDP) {
-            throw new IOException("Only UDP transport supported.");
+            if (collectMetrics) {
+                synchronized (metric) {
+                    metric.incDropped();
+                }
+            }
+
+            if (trace) {
+                logger.trace("Only UDP transport supported.");
+            }
+
+            return;
         }
 
         int hashCode = (sessionInfo.hashCode() + workerIndex) & 0x7fffffff;
@@ -188,5 +215,6 @@
             );
             return def;
         }
+
     }
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java	Tue Apr 28 15:20:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java	Wed Apr 29 10:03:46 2020 +0200
@@ -93,7 +93,6 @@
         this.mtu = mtu;
     }
 
-
     @Override
     public boolean isCollectMetrics() {
         return collectMetrics;
@@ -102,13 +101,26 @@
     @Override
     public void setCollectMetrics(boolean collectMetrics) {
         this.collectMetrics = collectMetrics;
+        if (collectMetrics) {
+            metric = new EmitterMetric();
+            synchronized (metric) {
+                metric.activate();
+            }
+        } else if (metric != null) {
+            synchronized (metric) {
+                metric.deactivate();
+            }
+            metric = null;
+        }
     }
 
     @Override
     public void writeMetrics(MetricsContainer container) {
         if (collectMetrics) {
-            container.update(System.currentTimeMillis(), metric);
-            metric.reset();
+            synchronized (metric) {
+                container.update(System.currentTimeMillis(), metric);
+                metric.reset();
+            }
         }
     }
 
@@ -259,7 +271,7 @@
     }
 
     protected abstract int doWrite0(UnidirectionalRawPacketChannelContext<E> channelContext, byte[] frame, int length) throws IOException;
-    
+
     private void doWrite(SessionInfo sessionInfo) {
         UnidirectionalRawPacketChannelContext<E> channelContext = sessions.get(sessionInfo);
         if (channelContext == null) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Tue Apr 28 15:20:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Wed Apr 29 10:03:46 2020 +0200
@@ -46,6 +46,10 @@
         this.sessionMapper = sessionMapper;
         this.listener = listener;
         this.collectMetrics = collectMetrics;
+        if (collectMetrics) {
+            metric = new EmitterMetric();
+            metric.activate();
+        }
     }
 
     public abstract boolean isConnected();
@@ -147,7 +151,9 @@
         }
 
         if (collectMetrics) {
-            metric.incClosedConnections();
+            synchronized (metric) {
+                metric.incClosedConnections();
+            }
         }
 
         working = false;
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Tue Apr 28 15:20:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Wed Apr 29 10:03:46 2020 +0200
@@ -1,6 +1,10 @@
 package com.passus.st.emitter.socket;
 
 import com.passus.commons.annotations.Plugin;
+import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
+import com.passus.config.annotations.NodeDefinitionCreate;
+import com.passus.config.schema.MapNodeDefinition;
 import com.passus.net.session.Session;
 import com.passus.st.emitter.*;
 import com.passus.st.metric.MetricsContainer;
@@ -12,6 +16,7 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+@NodeDefinitionCreate(SocketEmitter.SocketEmitterNodeDefCreator.class)
 @Plugin(name = SocketEmitter.TYPE, category = PluginConstants.CATEGORY_EMITTER)
 public class SocketEmitter implements Emitter {
 
@@ -56,6 +61,22 @@
     }
 
     @Override
+    public void writeMetrics(MetricsContainer container) {
+        if (collectMetrics) {
+            connections.forEach((sessionInfo, connection) -> {
+                synchronized (connection.metric) {
+                    container.update(connection.metric);
+                }
+            });
+        }
+    }
+
+    @Override
+    public void configure(Configuration config, ConfigurationContext context) {
+        Emitter.super.configure(config, context);
+    }
+
+    @Override
     public boolean isStarted() {
         return started;
     }
@@ -143,8 +164,12 @@
         }
     }
 
-    @Override
-    public void writeMetrics(MetricsContainer container) {
+    public static class SocketEmitterNodeDefCreator extends EmitterNodeDefCreator {
+
+        @Override
+        public MapNodeDefinition create() {
+            return super.create();
+        }
 
     }
 
--- a/stress-tester/src/main/java/com/passus/st/metric/MetricSource.java	Tue Apr 28 15:20:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/metric/MetricSource.java	Wed Apr 29 10:03:46 2020 +0200
@@ -5,6 +5,8 @@
  */
 public interface MetricSource {
 
+    boolean DEFAULT_COLLECT_METRICS = true;
+
     boolean isCollectMetrics();
 
     void setCollectMetrics(boolean collectMetrics);
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java	Tue Apr 28 15:20:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java	Wed Apr 29 10:03:46 2020 +0200
@@ -212,7 +212,7 @@
         loops = config.getInteger("loops", EventSource.DEFAULT_LOOPS);
         loopDelay = config.getLong("loopDelay", 0L);
 
-        setCollectMetrics(config.getBoolean("collectMetrics", Boolean.FALSE));
+        setCollectMetrics(config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS));
 
         Map<String, Map> sessionProc = config.getMap("sessionProc", Collections.EMPTY_MAP);
         Map<String, Object> tcp = sessionProc.getOrDefault("tcp", Collections.EMPTY_MAP);
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSourceMetric.java	Tue Apr 28 15:20:57 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSourceMetric.java	Wed Apr 29 10:03:46 2020 +0200
@@ -16,7 +16,7 @@
  */
 public class PcapSessionEventSourceMetric implements Metric {
 
-    public static final String DEFAULT_NAME = "pcapSource";
+    public static final String DEFAULT_NAME = "PcapSource";
 
     private final String name;
 
@@ -41,6 +41,7 @@
         this.name = name;
         attrs.put("frames", frames);
         attrs.put("tcpPackets", tcpPackets);
+        attrs.put("udpPackets", udpPackets);
         attrs.put("payloads", payloads);
         attrs.put("events", events);
         attrs.put("eventsEnqueued", eventsEnqueued);
@@ -95,7 +96,6 @@
 
     public void incDequeued() {
         eventsDequeued.increment();
-        events.decrementAndGet();
     }
 
     @Override
@@ -122,6 +122,7 @@
     public void reset() {
         frames.setValue(0);
         tcpPackets.setValue(0);
+        udpPackets.setValue(0);
         payloads.setValue(0);
         events.set(0);
         eventsEnqueued.setValue(0);
@@ -137,6 +138,7 @@
         PcapSessionEventSourceMetric pcapMetric = (PcapSessionEventSourceMetric) metric;
         frames.add(pcapMetric.frames);
         tcpPackets.add(pcapMetric.tcpPackets);
+        udpPackets.add(pcapMetric.udpPackets);
         payloads.add(pcapMetric.payloads);
         events.addAndGet(pcapMetric.events.get());
         eventsEnqueued.add(pcapMetric.eventsEnqueued);