Mercurial > stress-tester
changeset 1076:992057a5cc68
Metrics bugfixes
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java Tue Apr 28 15:20:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java Wed Apr 29 10:03:46 2020 +0200 @@ -8,7 +8,7 @@ private final FlowHandlerDataEncoder<R> encoder; private final FlowHandlerDataDecoder<S> decoder; - protected boolean collectMetrics = false; + protected boolean collectMetrics = DEFAULT_COLLECT_METRICS; protected T metric;
--- a/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java Tue Apr 28 15:20:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java Wed Apr 29 10:03:46 2020 +0200 @@ -14,7 +14,7 @@ TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); - boolean collectMetrics = false; + boolean collectMetrics = DEFAULT_COLLECT_METRICS; DnsMetric metric;
--- a/stress-tester/src/main/java/com/passus/st/client/http/ReporterFileDestination.java Tue Apr 28 15:20:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/ReporterFileDestination.java Wed Apr 29 10:03:46 2020 +0200 @@ -240,7 +240,7 @@ builder.append(";;;;;"); } - metric = collection.getMetric("pcapSource"); + metric = collection.getMetric("PcapSource"); if (metric != null) { Map<String, Serializable> fields = metric.getAttributesValue(); addValue(builder, fields.get("frames"));
--- a/stress-tester/src/main/java/com/passus/st/client/netflow/NetflowMetric.java Tue Apr 28 15:20:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/netflow/NetflowMetric.java Wed Apr 29 10:03:46 2020 +0200 @@ -21,7 +21,7 @@ public NetflowMetric(String name) { super(name); attrs.put("v5flows", v5flows); - attrs.put("v5flows", v5flows); + attrs.put("v9flows", v9flows); attrs.put("v10flows", v10flows); } @@ -38,7 +38,7 @@ break; case NetflowUtils.VERSION_10: Netflow10 netflow10 = (Netflow10) netflow; - v9flows += netflow10.getFlowSets().size(); + v10flows += netflow10.getFlowSets().size(); break; } } catch (Exception ignore) { @@ -49,9 +49,9 @@ @Override public void update(Metric metric) { NetflowMetric netflowMetric = (NetflowMetric) metric; - netflowMetric.v5flows += v5flows; - netflowMetric.v9flows += v9flows; - netflowMetric.v10flows += v10flows; + v5flows += netflowMetric.v5flows; + v9flows += netflowMetric.v9flows; + v10flows += netflowMetric.v10flows; } @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/Emitter.java Tue Apr 28 15:20:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/Emitter.java Wed Apr 29 10:03:46 2020 +0200 @@ -3,42 +3,41 @@ import com.passus.commons.service.Service; import com.passus.config.Configurable; import com.passus.config.Configuration; -import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef; -import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef; - import com.passus.config.ConfigurationContext; import com.passus.config.schema.MapNodeDefinition; import com.passus.config.schema.NodeDefinitionCreator; import com.passus.st.metric.MetricSource; + import java.io.IOException; +import static com.passus.config.schema.ConfigurationSchemaBuilder.*; + /** - * * @author Mirosław Hawrot */ public interface Emitter extends Service, MetricSource, Configurable { - public static final SessionMapper DEFAULT_SESSION_MAPPER = new PassThroughSessionMapper(); - - public static final boolean DEFAULT_COLLECT_METRICS = false; + SessionMapper DEFAULT_SESSION_MAPPER = new PassThroughSessionMapper(); - public void setSessionMapper(SessionMapper mapper); + void setSessionMapper(SessionMapper mapper); - public SessionMapper getSessionMapper(); + SessionMapper getSessionMapper(); - public void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException; + void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException; @Override - public default void configure(Configuration config, ConfigurationContext context) { + default void configure(Configuration config, ConfigurationContext context) { setSessionMapper((SessionMapper) config.get("sessionMapper")); + setCollectMetrics(config.get("collectMetrics", DEFAULT_COLLECT_METRICS)); } - public static abstract class EmitterNodeDefCreator implements NodeDefinitionCreator { + abstract class EmitterNodeDefCreator implements NodeDefinitionCreator { @Override public MapNodeDefinition create() { return mapDef( - tupleDef("sessionMapper", SessionMapperNodeDefinitionCreator.createDef()).setRequired(false) + tupleDef("sessionMapper", SessionMapperNodeDefinitionCreator.createDef()).setRequired(false), + tupleDef("collectMetrics", valueDefBool()).setRequired(false) ); }
--- a/stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java Tue Apr 28 15:20:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java Wed Apr 29 10:03:46 2020 +0200 @@ -4,17 +4,13 @@ import com.passus.commons.metric.Metric; import com.passus.net.SocketAddress; import com.passus.st.utils.NetExceptionsCategory; -import java.io.Serializable; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.mutable.MutableLong; +import java.io.Serializable; +import java.util.*; + /** - * * @author Mirosław Hawrot */ public class EmitterMetric implements Metric { @@ -29,6 +25,8 @@ private final MutableLong receivedBytes = new MutableLong(0); + private final MutableLong dropped = new MutableLong(0); + private final MutableInt establishedConnections = new MutableInt(0); private final MutableInt closedConnections = new MutableInt(0); @@ -59,6 +57,7 @@ attrs.put("bindErrors", bindErrors); attrs.put("remoteSocketConnections", (Serializable) remoteSocketConnections); attrs.put("errors", (Serializable) errors); + attrs.put("dropped", dropped); //attrs.put("bindedSockets", (Serializable) bindedSockets); } @@ -83,6 +82,14 @@ return receivedBytes.longValue(); } + public long dropped() { + return dropped.longValue(); + } + + public void incDropped() { + dropped.increment(); + } + public void incConnectionsErrors() { connectionsErrors.increment(); } @@ -135,7 +142,7 @@ count.increment(); } } - + @Override public boolean isActive() { return active;
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java Tue Apr 28 15:20:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java Wed Apr 29 10:03:46 2020 +0200 @@ -135,7 +135,6 @@ Emitter.super.configure(config, context); setMaxThreads(config.getInteger("threads", DEFAULT_NUM_THREADS)); setConnectionTimeout(config.getLong("connectionTimeout", DEFAULT_CONNECTION_TIMEOUT)); - setCollectMetrics(config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS)); } @Override @@ -220,10 +219,6 @@ valueDef().setTransformer(PeriodValueTransformer.INSTANCE) ).setRequired(false); - def.add("collectMetrics", - valueDefBool() - ).setRequired(false); - return def; }
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java Tue Apr 28 15:20:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketEmitter.java Wed Apr 29 10:03:46 2020 +0200 @@ -8,6 +8,8 @@ import com.passus.net.session.Session; import com.passus.st.emitter.*; import com.passus.st.metric.MetricsContainer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.IOException; @@ -17,6 +19,8 @@ public abstract class UnidirectionalRawPacketEmitter<E> implements Emitter { + protected final Logger logger = LogManager.getLogger(getClass()); + private static final int DEFAULT_THREADS = 4; private static final int DEFAULT_MTU = 1500; private static final boolean DEFAULT_COLLECT_METRICS = true; @@ -34,10 +38,18 @@ private SessionMapper sessionMapper = Emitter.DEFAULT_SESSION_MAPPER; - private boolean collectMetrics = false; + private boolean collectMetrics = DEFAULT_COLLECT_METRICS; private int mtu = DEFAULT_MTU; + private EmitterMetric metric; + + protected final boolean trace; + + public UnidirectionalRawPacketEmitter() { + this.trace = logger.isTraceEnabled(); + } + @Override public void setSessionMapper(SessionMapper sessionMapper) { this.sessionMapper = sessionMapper; @@ -99,7 +111,11 @@ @Override public void writeMetrics(MetricsContainer container) { - + if (collectMetrics) { + for (UnidirectionalRawPacketWorker<E> worker : workers) { + worker.writeMetrics(container); + } + } } @Override @@ -132,6 +148,7 @@ worker.setPortPool(portPool); worker.setMacResolver(macResolver); worker.setSessionMapper(sessionMapper); + worker.setCollectMetrics(collectMetrics); worker.setMtu(mtu); worker.start(); workers[i] = worker; @@ -165,7 +182,17 @@ @Override public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException { if (sessionInfo.getTransport() != Session.PROTOCOL_UDP) { - throw new IOException("Only UDP transport supported."); + if (collectMetrics) { + synchronized (metric) { + metric.incDropped(); + } + } + + if (trace) { + logger.trace("Only UDP transport supported."); + } + + return; } int hashCode = (sessionInfo.hashCode() + workerIndex) & 0x7fffffff; @@ -188,5 +215,6 @@ ); return def; } + } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java Tue Apr 28 15:20:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/raw/UnidirectionalRawPacketWorker.java Wed Apr 29 10:03:46 2020 +0200 @@ -93,7 +93,6 @@ this.mtu = mtu; } - @Override public boolean isCollectMetrics() { return collectMetrics; @@ -102,13 +101,26 @@ @Override public void setCollectMetrics(boolean collectMetrics) { this.collectMetrics = collectMetrics; + if (collectMetrics) { + metric = new EmitterMetric(); + synchronized (metric) { + metric.activate(); + } + } else if (metric != null) { + synchronized (metric) { + metric.deactivate(); + } + metric = null; + } } @Override public void writeMetrics(MetricsContainer container) { if (collectMetrics) { - container.update(System.currentTimeMillis(), metric); - metric.reset(); + synchronized (metric) { + container.update(System.currentTimeMillis(), metric); + metric.reset(); + } } } @@ -259,7 +271,7 @@ } protected abstract int doWrite0(UnidirectionalRawPacketChannelContext<E> channelContext, byte[] frame, int length) throws IOException; - + private void doWrite(SessionInfo sessionInfo) { UnidirectionalRawPacketChannelContext<E> channelContext = sessions.get(sessionInfo); if (channelContext == null) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Tue Apr 28 15:20:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Wed Apr 29 10:03:46 2020 +0200 @@ -46,6 +46,10 @@ this.sessionMapper = sessionMapper; this.listener = listener; this.collectMetrics = collectMetrics; + if (collectMetrics) { + metric = new EmitterMetric(); + metric.activate(); + } } public abstract boolean isConnected(); @@ -147,7 +151,9 @@ } if (collectMetrics) { - metric.incClosedConnections(); + synchronized (metric) { + metric.incClosedConnections(); + } } working = false;
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Tue Apr 28 15:20:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Wed Apr 29 10:03:46 2020 +0200 @@ -1,6 +1,10 @@ package com.passus.st.emitter.socket; import com.passus.commons.annotations.Plugin; +import com.passus.config.Configuration; +import com.passus.config.ConfigurationContext; +import com.passus.config.annotations.NodeDefinitionCreate; +import com.passus.config.schema.MapNodeDefinition; import com.passus.net.session.Session; import com.passus.st.emitter.*; import com.passus.st.metric.MetricsContainer; @@ -12,6 +16,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +@NodeDefinitionCreate(SocketEmitter.SocketEmitterNodeDefCreator.class) @Plugin(name = SocketEmitter.TYPE, category = PluginConstants.CATEGORY_EMITTER) public class SocketEmitter implements Emitter { @@ -56,6 +61,22 @@ } @Override + public void writeMetrics(MetricsContainer container) { + if (collectMetrics) { + connections.forEach((sessionInfo, connection) -> { + synchronized (connection.metric) { + container.update(connection.metric); + } + }); + } + } + + @Override + public void configure(Configuration config, ConfigurationContext context) { + Emitter.super.configure(config, context); + } + + @Override public boolean isStarted() { return started; } @@ -143,8 +164,12 @@ } } - @Override - public void writeMetrics(MetricsContainer container) { + public static class SocketEmitterNodeDefCreator extends EmitterNodeDefCreator { + + @Override + public MapNodeDefinition create() { + return super.create(); + } }
--- a/stress-tester/src/main/java/com/passus/st/metric/MetricSource.java Tue Apr 28 15:20:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/metric/MetricSource.java Wed Apr 29 10:03:46 2020 +0200 @@ -5,6 +5,8 @@ */ public interface MetricSource { + boolean DEFAULT_COLLECT_METRICS = true; + boolean isCollectMetrics(); void setCollectMetrics(boolean collectMetrics);
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java Tue Apr 28 15:20:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java Wed Apr 29 10:03:46 2020 +0200 @@ -212,7 +212,7 @@ loops = config.getInteger("loops", EventSource.DEFAULT_LOOPS); loopDelay = config.getLong("loopDelay", 0L); - setCollectMetrics(config.getBoolean("collectMetrics", Boolean.FALSE)); + setCollectMetrics(config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS)); Map<String, Map> sessionProc = config.getMap("sessionProc", Collections.EMPTY_MAP); Map<String, Object> tcp = sessionProc.getOrDefault("tcp", Collections.EMPTY_MAP);
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSourceMetric.java Tue Apr 28 15:20:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSourceMetric.java Wed Apr 29 10:03:46 2020 +0200 @@ -16,7 +16,7 @@ */ public class PcapSessionEventSourceMetric implements Metric { - public static final String DEFAULT_NAME = "pcapSource"; + public static final String DEFAULT_NAME = "PcapSource"; private final String name; @@ -41,6 +41,7 @@ this.name = name; attrs.put("frames", frames); attrs.put("tcpPackets", tcpPackets); + attrs.put("udpPackets", udpPackets); attrs.put("payloads", payloads); attrs.put("events", events); attrs.put("eventsEnqueued", eventsEnqueued); @@ -95,7 +96,6 @@ public void incDequeued() { eventsDequeued.increment(); - events.decrementAndGet(); } @Override @@ -122,6 +122,7 @@ public void reset() { frames.setValue(0); tcpPackets.setValue(0); + udpPackets.setValue(0); payloads.setValue(0); events.set(0); eventsEnqueued.setValue(0); @@ -137,6 +138,7 @@ PcapSessionEventSourceMetric pcapMetric = (PcapSessionEventSourceMetric) metric; frames.add(pcapMetric.frames); tcpPackets.add(pcapMetric.tcpPackets); + udpPackets.add(pcapMetric.udpPackets); payloads.add(pcapMetric.payloads); events.addAndGet(pcapMetric.events.get()); eventsEnqueued.add(pcapMetric.eventsEnqueued);