Mercurial > stress-tester
changeset 1290:b8dcd14f0c95
SocketEmitter - metrics bugfix
author | Devel 2 |
---|---|
date | Tue, 14 Jul 2020 16:00:05 +0200 |
parents | 7d538c69a6e1 |
children | 610c12a93ee7 |
files | stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java |
diffstat | 4 files changed, 31 insertions(+), 56 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Tue Jul 14 15:32:25 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Tue Jul 14 16:00:05 2020 +0200 @@ -41,24 +41,21 @@ final boolean collectMetrics; - EmitterMetric metric; + final EmitterMetric metric; ByteBuff buffer = new HeapByteBuff(); public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, Timeouts timeouts, SocketParameters socketParameters, - boolean collectMetrics, ConnectionListener listener) { + boolean collectMetrics, EmitterMetric metric, ConnectionListener listener) { this.sessionInfo = sessionInfo; this.handler = handler; this.sessionMapper = sessionMapper; this.timeouts = timeouts; this.socketParameters = socketParameters; - this.listener = listener; this.collectMetrics = collectMetrics; - if (collectMetrics) { - metric = new EmitterMetric(); - metric.activate(); - } + this.metric = metric; + this.listener = listener; } public abstract boolean isConnected(); @@ -132,6 +129,12 @@ logger.debug("Closing session '" + channelContext.getSessionInfo() + "'."); } + if (collectMetrics) { + synchronized (metric) { + metric.incClosedConnections(); + } + } + close(); if (listener != null) { listener.onConnectionClosed(channelContext); @@ -153,12 +156,6 @@ logger.debug("Closed session '" + channelContext.getSessionInfo() + "'."); } - if (collectMetrics) { - synchronized (metric) { - metric.incClosedConnections(); - } - } - working = false; }
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java Tue Jul 14 15:32:25 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java Tue Jul 14 16:00:05 2020 +0200 @@ -3,11 +3,8 @@ import com.passus.data.ByteBuff; import com.passus.net.SocketAddress; import com.passus.st.client.Timeouts; -import com.passus.st.emitter.EmitterHandler; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.emitter.SessionMapper; +import com.passus.st.emitter.*; import com.passus.st.emitter.SessionMapper.ConnectionParams; -import com.passus.st.emitter.SocketParameters; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -38,8 +35,8 @@ public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, Timeouts timeouts, SocketParameters socketParameters, - boolean collectMetrics, ConnectionListener listener) { - super(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, listener); + boolean collectMetrics, EmitterMetric metric, ConnectionListener listener) { + super(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, metric, listener); } public boolean isConnected() { @@ -131,12 +128,6 @@ } catch (Exception ignore) { } - - if (collectMetrics) { - synchronized (metric) { - metric.incClosedConnections(); - } - } } @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java Tue Jul 14 15:32:25 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java Tue Jul 14 16:00:05 2020 +0200 @@ -3,10 +3,7 @@ import com.passus.data.ByteBuff; import com.passus.net.SocketAddress; import com.passus.st.client.Timeouts; -import com.passus.st.emitter.EmitterHandler; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.emitter.SessionMapper; -import com.passus.st.emitter.SocketParameters; +import com.passus.st.emitter.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -48,8 +45,8 @@ public SocketConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, Timeouts timeouts, SocketParameters socketParameters, - boolean collectMetrics, ConnectionListener listener) { - super(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, listener); + boolean collectMetrics, EmitterMetric metric, ConnectionListener listener) { + super(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, metric, listener); } @Override @@ -177,10 +174,6 @@ closeQuietly(socket); closeQuietly(in); closeQuietly(out); - - if (collectMetrics) { - metric.incClosedConnections(); - } } @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Tue Jul 14 15:32:25 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Tue Jul 14 16:00:05 2020 +0200 @@ -7,7 +7,6 @@ import com.passus.config.annotations.NodeDefinitionCreate; import com.passus.config.schema.MapNodeDefinition; import com.passus.net.session.Session; -import com.passus.st.client.PerNameMetricsContainer; import com.passus.st.client.Timeouts; import com.passus.st.config.SocketParametersNodeDefCreator; import com.passus.st.emitter.*; @@ -44,14 +43,14 @@ private int maxThreads = DEFAULT_NUM_THREADS; - private final PerNameMetricsContainer deferredMetrics = new PerNameMetricsContainer(); - private boolean started; private Timeouts timeouts = DEFAULT_TIMEOUTS; private SocketParameters socketParameters = DEFAULT_SOCKET_PARAMETERS; + private EmitterMetric metric; + @Override public void setSessionMapper(SessionMapper sessionMapper) { this.sessionMapper = sessionMapper; @@ -95,19 +94,8 @@ @Override public void writeMetrics(MetricsContainer container) { if (collectMetrics) { - connections.forEach((sessionInfo, connection) -> { - synchronized (connection.metric) { - container.update(connection.metric); - } - }); - - if (!deferredMetrics.isEmpty()) { - deferredMetrics.getMetrics().forEach(m -> { - container.update(m); - m.reset(); - }); - - deferredMetrics.clear(); + synchronized (metric) { + container.update(metric); } } } @@ -138,6 +126,10 @@ sessionMapper = Emitter.DEFAULT_SESSION_MAPPER; } + if (collectMetrics) { + metric = new EmitterMetric(); + } + started = true; } } @@ -186,12 +178,14 @@ @Override public void onConnectionClosed(ChannelContext channelContext) { - if (collectMetrics) { + /*if (collectMetrics) { Connection connection = connections.get(channelContext.getSessionInfo()); if (connection != null) { - deferredMetrics.update(connection.metric); + synchronized (connection.metric) { + deferredMetrics.update(connection.metric); + } } - } + }*/ connections.remove(channelContext.getSessionInfo()); } @@ -200,9 +194,9 @@ Connection connection; if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) { - connection = new SocketConnection(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, listener); + connection = new SocketConnection(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, metric, listener); } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) { - connection = new DatagramConnection(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, listener); + connection = new DatagramConnection(sessionInfo, handler, sessionMapper, timeouts, socketParameters, collectMetrics, metric, listener); } else { throw new IllegalArgumentException("Not supported transport " + sessionInfo.getTransport() + "."); }