changeset 1290:b8dcd14f0c95

SocketEmitter - metrics bugfix
author Devel 2
date Tue, 14 Jul 2020 16:00:05 +0200
parents 7d538c69a6e1
children 610c12a93ee7
files stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java
diffstat 4 files changed, 31 insertions(+), 56 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Tue Jul 14 15:32:25 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Tue Jul 14 16:00:05 2020 +0200
@@ -41,24 +41,21 @@
 
     final boolean collectMetrics;
 
-    EmitterMetric metric;
+    final EmitterMetric metric;
 
     ByteBuff buffer = new HeapByteBuff();
 
     public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper,
                       Timeouts timeouts, SocketParameters socketParameters,
-                      boolean collectMetrics, ConnectionListener listener) {
+                      boolean collectMetrics, EmitterMetric metric, ConnectionListener listener) {
         this.sessionInfo = sessionInfo;
         this.handler = handler;
         this.sessionMapper = sessionMapper;
         this.timeouts = timeouts;
         this.socketParameters = socketParameters;
-        this.listener = listener;
         this.collectMetrics = collectMetrics;
-        if (collectMetrics) {
-            metric = new EmitterMetric();
-            metric.activate();
-        }
+        this.metric = metric;
+        this.listener = listener;
     }
 
     public abstract boolean isConnected();
@@ -132,6 +129,12 @@
             logger.debug("Closing session '" + channelContext.getSessionInfo() + "'.");
         }
 
+        if (collectMetrics) {
+            synchronized (metric) {
+                metric.incClosedConnections();
+            }
+        }
+
         close();
         if (listener != null) {
             listener.onConnectionClosed(channelContext);
@@ -153,12 +156,6 @@
             logger.debug("Closed session '" + channelContext.getSessionInfo() + "'.");
         }
 
-        if (collectMetrics) {
-            synchronized (metric) {
-                metric.incClosedConnections();
-            }
-        }
-
         working = false;
     }
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Tue Jul 14 15:32:25 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Tue Jul 14 16:00:05 2020 +0200
@@ -3,11 +3,8 @@
 import com.passus.data.ByteBuff;
 import com.passus.net.SocketAddress;
 import com.passus.st.client.Timeouts;
-import com.passus.st.emitter.EmitterHandler;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.emitter.SessionMapper;
+import com.passus.st.emitter.*;
 import com.passus.st.emitter.SessionMapper.ConnectionParams;
-import com.passus.st.emitter.SocketParameters;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -38,8 +35,8 @@
 
     public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper,
                               Timeouts timeouts, SocketParameters socketParameters,
-                              boolean collectMetrics, ConnectionListener listener) {
-        super(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, listener);
+                              boolean collectMetrics, EmitterMetric metric, ConnectionListener listener) {
+        super(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, metric, listener);
     }
 
     public boolean isConnected() {
@@ -131,12 +128,6 @@
         } catch (Exception ignore) {
 
         }
-
-        if (collectMetrics) {
-            synchronized (metric) {
-                metric.incClosedConnections();
-            }
-        }
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java	Tue Jul 14 15:32:25 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java	Tue Jul 14 16:00:05 2020 +0200
@@ -3,10 +3,7 @@
 import com.passus.data.ByteBuff;
 import com.passus.net.SocketAddress;
 import com.passus.st.client.Timeouts;
-import com.passus.st.emitter.EmitterHandler;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.emitter.SessionMapper;
-import com.passus.st.emitter.SocketParameters;
+import com.passus.st.emitter.*;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -48,8 +45,8 @@
 
     public SocketConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper,
                             Timeouts timeouts, SocketParameters socketParameters,
-                            boolean collectMetrics, ConnectionListener listener) {
-        super(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, listener);
+                            boolean collectMetrics, EmitterMetric metric, ConnectionListener listener) {
+        super(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, metric, listener);
     }
 
     @Override
@@ -177,10 +174,6 @@
         closeQuietly(socket);
         closeQuietly(in);
         closeQuietly(out);
-
-        if (collectMetrics) {
-            metric.incClosedConnections();
-        }
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Tue Jul 14 15:32:25 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Tue Jul 14 16:00:05 2020 +0200
@@ -7,7 +7,6 @@
 import com.passus.config.annotations.NodeDefinitionCreate;
 import com.passus.config.schema.MapNodeDefinition;
 import com.passus.net.session.Session;
-import com.passus.st.client.PerNameMetricsContainer;
 import com.passus.st.client.Timeouts;
 import com.passus.st.config.SocketParametersNodeDefCreator;
 import com.passus.st.emitter.*;
@@ -44,14 +43,14 @@
 
     private int maxThreads = DEFAULT_NUM_THREADS;
 
-    private final PerNameMetricsContainer deferredMetrics = new PerNameMetricsContainer();
-
     private boolean started;
 
     private Timeouts timeouts = DEFAULT_TIMEOUTS;
 
     private SocketParameters socketParameters = DEFAULT_SOCKET_PARAMETERS;
 
+    private EmitterMetric metric;
+
     @Override
     public void setSessionMapper(SessionMapper sessionMapper) {
         this.sessionMapper = sessionMapper;
@@ -95,19 +94,8 @@
     @Override
     public void writeMetrics(MetricsContainer container) {
         if (collectMetrics) {
-            connections.forEach((sessionInfo, connection) -> {
-                synchronized (connection.metric) {
-                    container.update(connection.metric);
-                }
-            });
-
-            if (!deferredMetrics.isEmpty()) {
-                deferredMetrics.getMetrics().forEach(m -> {
-                    container.update(m);
-                    m.reset();
-                });
-
-                deferredMetrics.clear();
+            synchronized (metric) {
+                container.update(metric);
             }
         }
     }
@@ -138,6 +126,10 @@
                 sessionMapper = Emitter.DEFAULT_SESSION_MAPPER;
             }
 
+            if (collectMetrics) {
+                metric = new EmitterMetric();
+            }
+
             started = true;
         }
     }
@@ -186,12 +178,14 @@
 
                 @Override
                 public void onConnectionClosed(ChannelContext channelContext) {
-                    if (collectMetrics) {
+                    /*if (collectMetrics) {
                         Connection connection = connections.get(channelContext.getSessionInfo());
                         if (connection != null) {
-                            deferredMetrics.update(connection.metric);
+                            synchronized (connection.metric) {
+                                deferredMetrics.update(connection.metric);
+                            }
                         }
-                    }
+                    }*/
 
                     connections.remove(channelContext.getSessionInfo());
                 }
@@ -200,9 +194,9 @@
 
             Connection connection;
             if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) {
-                connection = new SocketConnection(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, listener);
+                connection = new SocketConnection(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, metric, listener);
             } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) {
-                connection = new DatagramConnection(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, listener);
+                connection = new DatagramConnection(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, metric, listener);
             } else {
                 throw new IllegalArgumentException("Not supported transport " + sessionInfo.getTransport() + ".");
             }