changeset 1185:a8cd127261a8

NcEventSourceMetric
author Devel 2
date Wed, 17 Jun 2020 12:39:18 +0200
parents 7941b4eadd4b
children 8b605b57e68d
files stress-tester/src/main/java/com/passus/st/source/NcEventSource.java stress-tester/src/main/java/com/passus/st/source/NcEventSourceMetric.java
diffstat 2 files changed, 164 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Wed Jun 17 12:18:10 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Wed Jun 17 12:39:18 2020 +0200
@@ -13,6 +13,7 @@
 import com.passus.data.ByteBuffAllocator;
 import com.passus.data.ByteBuffDataSource;
 import com.passus.data.DefaultByteBuffAllocator;
+import com.passus.net.session.Session;
 import com.passus.st.ReqRespPair;
 import com.passus.st.client.DataEvents;
 import com.passus.st.client.DataEvents.DataEnd;
@@ -22,6 +23,8 @@
 import com.passus.st.client.SessionStatusEvent;
 import com.passus.st.config.FileTransformer;
 import com.passus.st.emitter.SessionInfo;
+import com.passus.st.metric.MetricSource;
+import com.passus.st.metric.MetricsContainer;
 import com.passus.st.plugin.PluginConstants;
 import com.passus.st.reader.nc.*;
 import org.apache.logging.log4j.LogManager;
@@ -39,7 +42,7 @@
  */
 @NodeDefinitionCreate(NcEventSource.NcEventSourceNodeDefinitionCreator.class)
 @Plugin(name = NcEventSource.TYPE, category = PluginConstants.CATEGORY_EVENT_SOURCE)
-public class NcEventSource implements EventSource {
+public class NcEventSource implements EventSource, MetricSource {
 
     private static final Logger LOGGER = LogManager.getLogger(NcEventSource.class);
 
@@ -75,6 +78,10 @@
 
     private int loops = DEFAULT_LOOPS;
 
+    private boolean collectMetrics = true;
+
+    private NcEventSourceMetric metric = new NcEventSourceMetric();
+
     public NcEventSource() {
     }
 
@@ -149,6 +156,34 @@
     }
 
     @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        if (collectMetrics && metric == null) {
+            metric = new NcEventSourceMetric();
+            metric.activate();
+            this.collectMetrics = true;
+        } else if (!collectMetrics && metric != null) {
+            metric.deactivate();
+            this.collectMetrics = false;
+            metric = null;
+        }
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+        if (collectMetrics) {
+            synchronized (metric) {
+                container.update(metric);
+                metric.reset();
+            }
+        }
+    }
+
+    @Override
     public boolean isStarted() {
         return started;
     }
@@ -274,6 +309,18 @@
                         return;
                 }
 
+                if (collectMetrics) {
+                    synchronized (metric) {
+                        if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) {
+                            metric.incTcpPacket();
+                        } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) {
+                            metric.incUdpPacket();
+                        }
+
+                        metric.incEvents();
+                    }
+                }
+
                 handler.handle(new SessionPayloadEvent(sessionInfo, messages.getRequest(), messages.getResponse(), payloadBlock.proto(), getName()));
                 break;
             default:
@@ -343,6 +390,7 @@
         public NodeDefinition create() {
             return mapDef(
                     tupleDef("fileName", valueDef().setTransformer(new FileTransformer(true))),
+                    tupleDef("collectMetrics", valueDefBool()).setRequired(false),
                     tupleDef("loops", INT_GREATER_EQUAL_ZERO_DEF).setRequired(false)
             );
         }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSourceMetric.java	Wed Jun 17 12:39:18 2020 +0200
@@ -0,0 +1,115 @@
+package com.passus.st.source;
+
+import com.passus.commons.Assert;
+import com.passus.commons.metric.Metric;
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NcEventSourceMetric implements Metric {
+
+    public static final String DEFAULT_NAME = "NcSource";
+
+    private final String name;
+
+    private final MutableInt tcpPackets = new MutableInt(0);
+    private final MutableInt udpPackets = new MutableInt(0);
+    private final AtomicInteger events = new AtomicInteger();
+
+    private final Map<String, Serializable> attrs = new LinkedHashMap<>();
+
+    private boolean active;
+
+    public NcEventSourceMetric() {
+        this(DEFAULT_NAME);
+    }
+
+    public NcEventSourceMetric(String name) {
+        Assert.notNull(name, "name");
+        this.name = name;
+        attrs.put("tcpPackets", tcpPackets);
+        attrs.put("udpPackets", udpPackets);
+        attrs.put("events", events);
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public boolean isActive() {
+        return active;
+    }
+
+    @Override
+    public void activate() {
+        active = true;
+    }
+
+    @Override
+    public void deactivate() {
+        if (!active) {
+            return;
+        }
+
+        reset();
+        active = false;
+    }
+
+    public void incTcpPacket() {
+        tcpPackets.increment();
+    }
+
+    public void incUdpPacket() {
+        udpPackets.increment();
+    }
+
+    public void incEvents() {
+        events.incrementAndGet();
+    }
+
+    @Override
+    public boolean hasAttribute(String name) {
+        return attrs.containsKey(name);
+    }
+
+    @Override
+    public Set<String> getAttributesName() {
+        return attrs.keySet();
+    }
+
+    @Override
+    public Serializable getAttributeValue(String name) {
+        return attrs.get(name);
+    }
+
+    @Override
+    public Map<String, Serializable> getAttributesValue() {
+        return Collections.unmodifiableMap(attrs);
+    }
+
+    @Override
+    public void reset() {
+        tcpPackets.setValue(0);
+        udpPackets.setValue(0);
+        events.set(0);
+    }
+
+    @Override
+    public void update(Metric metric) {
+        if (!(metric instanceof NcEventSourceMetric)) {
+            throw new IllegalArgumentException(NcEventSourceMetric.class + " required.");
+        }
+
+        NcEventSourceMetric ncMetric = (NcEventSourceMetric) metric;
+        tcpPackets.add(ncMetric.tcpPackets);
+        udpPackets.add(ncMetric.udpPackets);
+        events.addAndGet(ncMetric.events.get());
+    }
+}