Mercurial > stress-tester
changeset 1062:46ebffd7b305
Metrics improvements + bugfixes
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AbstractFlowProcessorSupervisorWrapper.java Fri Apr 17 11:13:20 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/AbstractFlowProcessorSupervisorWrapper.java Fri Apr 17 11:14:07 2020 +0200 @@ -52,7 +52,7 @@ @Override public boolean isCollectMetrics() { - return flowWorker.collectMetric; + return flowWorker.collectMetrics; } @Override
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java Fri Apr 17 11:13:20 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java Fri Apr 17 11:14:07 2020 +0200 @@ -8,7 +8,7 @@ import com.passus.st.client.dns.DnsFlowHandler; import com.passus.st.client.http.HttpFlowHandler; import com.passus.st.client.netflow.NetflowFlowHandler; -import com.passus.st.client.pgsql.PgSqlFlowHandlerNetflowFlowHandler; +import com.passus.st.client.pgsql.PgSqlFlowHandler; import static com.passus.st.Protocols.*; @@ -24,7 +24,7 @@ case NETFLOW: return new NetflowFlowHandler(); case PGSQL: - return new PgSqlFlowHandlerNetflowFlowHandler(); + return new PgSqlFlowHandler(); } @@ -44,7 +44,7 @@ } else if (message instanceof Netflow) { return new NetflowFlowHandler(); } else if (message instanceof PgSqlMessage) { - return new PgSqlFlowHandlerNetflowFlowHandler(); + return new PgSqlFlowHandler(); } throw new IllegalArgumentException("Not supported class '" + message.getClass() + "'.");
--- a/stress-tester/src/main/java/com/passus/st/client/FlowMetric.java Fri Apr 17 11:13:20 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowMetric.java Fri Apr 17 11:14:07 2020 +0200 @@ -7,7 +7,7 @@ public class FlowMetric extends GenericMetric { - public static final String FLOW_METRIC = "flow"; + public static final String FLOW_METRIC = "Flow"; private final LongHistogramMetric requestSendingTimeHistogram = new LongHistogramMetricImpl(DEFAULT_HISTOGRAM_VALUES);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Fri Apr 17 11:13:20 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Fri Apr 17 11:14:07 2020 +0200 @@ -31,7 +31,7 @@ protected FlowHandlerFactory clientFactory = new FlowHandlerFactoryImpl(); - protected boolean collectMetric; + protected boolean collectMetrics; protected FlowMetric metric; @@ -155,10 +155,10 @@ if (collectMetrics && metric == null) { metric = new FlowMetric(); metric.activate(); - collectMetric = true; + this.collectMetrics = true; } else if (!collectMetrics && metric != null) { metric.deactivate(); - collectMetric = false; + this.collectMetrics = false; metric = null; } } @@ -176,7 +176,7 @@ @Override public void writeMetrics(MetricsContainer container) { - if (collectMetric) { + if (collectMetrics) { synchronized (metric) { container.update(System.currentTimeMillis(), metric); metric.reset();
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Fri Apr 17 11:13:20 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Fri Apr 17 11:14:07 2020 +0200 @@ -504,7 +504,7 @@ } if (decoder.state() == DataDecoder.STATE_ERROR) { - if (collectMetric) { + if (collectMetrics) { synchronized (metric) { metric.incErrorNum(); } @@ -517,7 +517,7 @@ decoder.clear(flowContext); responseReceived0(flowContext, null); } else if (decoder.state() == DataDecoder.STATE_FINISHED) { - if (collectMetric) { + if (collectMetrics) { synchronized (metric) { metric.incResponsesNum(); metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp()); @@ -542,7 +542,7 @@ responseReceived0(flowContext, resp); } } catch (Exception e) { - if (collectMetric) { + if (collectMetrics) { synchronized (metric) { metric.incErrorNum(); } @@ -581,7 +581,7 @@ try { if (flowContext.isEventSent()) { long now = timeGenerator.currentTimeMillis(); - if (collectMetric) { + if (collectMetrics) { synchronized (metric) { metric.addRequestSendingTime(now - flowContext.sendStartTimestamp()); } @@ -668,7 +668,7 @@ return; } - if (collectMetric) { + if (collectMetrics) { synchronized (metric) { metric.incRequestsNum(); metric.addRequestSize(flowContext.buffer().readableBytes());
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/PerNameMetricsContainer.java Fri Apr 17 11:14:07 2020 +0200 @@ -0,0 +1,46 @@ +package com.passus.st.client; + +import com.passus.commons.metric.Metric; +import com.passus.st.metric.MetricsContainer; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class PerNameMetricsContainer implements MetricsContainer { + + private Map<String, Metric> metrics = new HashMap<>(); + + @Override + public int size() { + return metrics.size(); + } + + public Collection<Metric> getMetrics() { + return metrics.values(); + } + + @Override + public void clear() { + metrics.clear(); + } + + @Override + public boolean update(long time, Metric metric) { + Metric rangeMetric = metrics.get(metric.getName()); + if (rangeMetric == null) { + synchronized (metric) { + rangeMetric = metric.copy(); + } + + metrics.put(metric.getName(), rangeMetric); + } else { + synchronized (metric) { + rangeMetric.update(metric); + } + } + + return true; + } + +}
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Fri Apr 17 11:13:20 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Fri Apr 17 11:14:07 2020 +0200 @@ -33,6 +33,8 @@ private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>(); + private final PerNameMetricsContainer deferredMetrics = new PerNameMetricsContainer(); + private long eventsQueueWaitTime = 100; private int loop = 0; @@ -87,6 +89,15 @@ f.client.writeMetrics(container); } }); + + if (!deferredMetrics.isEmpty()) { + deferredMetrics.getMetrics().forEach(m -> { + container.update(m); + m.reset(); + }); + + deferredMetrics.clear(); + } } protected FlowContext register(SessionEvent sessionEvent) { @@ -103,7 +114,7 @@ flowContext.createLock(); FlowHandler client = clientFactory.create(session.getProtocolId()); client.init(flowContext); - client.setCollectMetrics(collectMetric); + client.setCollectMetrics(collectMetrics); flowContext.client(client); sessions.put(session, flowContext); return flowContext; @@ -313,6 +324,10 @@ @Override public void onDisconnected(FlowContext flowContext) { + if (collectMetrics) { + flowContext.client.writeMetrics(deferredMetrics); + } + sessions.remove(flowContext.session); }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerMetric.java Fri Apr 17 11:13:20 2020 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,82 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.commons.metric.LongHistogramMetric; -import com.passus.commons.metric.LongHistogramMetricImpl; -import com.passus.commons.metric.Metric; -import com.passus.st.client.GenericMetric; -import org.apache.commons.lang3.mutable.MutableInt; - -import java.util.Map; -import java.util.TreeMap; - -/** - * @author Mirosław Hawrot - */ -public class HttpClientWorkerMetric extends GenericMetric { - - public static final String DEFAULT_NAME = "HTTP"; - - private final TreeMap<Integer, MutableInt> respStatuses = new TreeMap<>(); - - private final LongHistogramMetric responseTimeHistogram = new LongHistogramMetricImpl(DEFAULT_HISTOGRAM_VALUES); - - private final LongHistogramMetric responseSizeHistogram = new LongHistogramMetricImpl(DEFAULT_HISTOGRAM_VALUES); - - public HttpClientWorkerMetric() { - this(DEFAULT_NAME); - } - - public HttpClientWorkerMetric(String name) { - super(name); - attrs.put("responseStatusCodes", respStatuses); - attrs.put("responseTimeDist", responseTimeHistogram); - attrs.put("responseSizeHistogram", responseSizeHistogram); - - } - - @Override - protected void doActivate() { - super.doActivate(); - responseTimeHistogram.activate(); - responseSizeHistogram.activate(); - } - - @Override - protected void doDeactivate() { - super.doDeactivate(); - responseTimeHistogram.deactivate(); - responseSizeHistogram.deactivate(); - } - - public Map<Integer, MutableInt> getStatusCodes() { - return respStatuses; - } - - public void addResponseStatusCode(Integer statusCode) { - Metric.incrementCountMap(statusCode, respStatuses); - } - - public void addResponseTime(long time) { - responseTimeHistogram.update(time); - } - - public void addResponseSize(long size) { - responseSizeHistogram.update(size); - } - - @Override - public void reset() { - respStatuses.clear(); - responseTimeHistogram.reset(); - responseSizeHistogram.reset(); - } - - @Override - public void update(Metric metric) { - HttpClientWorkerMetric clientMetric = (HttpClientWorkerMetric) metric; - Metric.updateCountMap(respStatuses, clientMetric.respStatuses); - responseTimeHistogram.update(clientMetric.responseTimeHistogram); - responseSizeHistogram.update(clientMetric.responseSizeHistogram); - } - -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java Fri Apr 17 11:13:20 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java Fri Apr 17 11:14:07 2020 +0200 @@ -22,7 +22,7 @@ boolean collectMetrics = false; - HttpClientWorkerMetric metric; + HttpMetric metric; HttpScopes scopes; @@ -73,7 +73,7 @@ public void setCollectMetrics(boolean collectMetrics) { this.collectMetrics = collectMetrics; if (collectMetrics) { - metric = new HttpClientWorkerMetric(); + metric = new HttpMetric(); synchronized (metric) { metric.activate(); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpMetric.java Fri Apr 17 11:14:07 2020 +0200 @@ -0,0 +1,82 @@ +package com.passus.st.client.http; + +import com.passus.commons.metric.LongHistogramMetric; +import com.passus.commons.metric.LongHistogramMetricImpl; +import com.passus.commons.metric.Metric; +import com.passus.st.client.GenericMetric; +import org.apache.commons.lang3.mutable.MutableInt; + +import java.util.Map; +import java.util.TreeMap; + +/** + * @author Mirosław Hawrot + */ +public class HttpMetric extends GenericMetric { + + public static final String DEFAULT_NAME = "HTTP"; + + private final TreeMap<Integer, MutableInt> respStatuses = new TreeMap<>(); + + private final LongHistogramMetric responseTimeHistogram = new LongHistogramMetricImpl(DEFAULT_HISTOGRAM_VALUES); + + private final LongHistogramMetric responseSizeHistogram = new LongHistogramMetricImpl(DEFAULT_HISTOGRAM_VALUES); + + public HttpMetric() { + this(DEFAULT_NAME); + } + + public HttpMetric(String name) { + super(name); + attrs.put("responseStatusCodes", respStatuses); + attrs.put("responseTimeDist", responseTimeHistogram); + attrs.put("responseSizeHistogram", responseSizeHistogram); + + } + + @Override + protected void doActivate() { + super.doActivate(); + responseTimeHistogram.activate(); + responseSizeHistogram.activate(); + } + + @Override + protected void doDeactivate() { + super.doDeactivate(); + responseTimeHistogram.deactivate(); + responseSizeHistogram.deactivate(); + } + + public Map<Integer, MutableInt> getStatusCodes() { + return respStatuses; + } + + public void addResponseStatusCode(Integer statusCode) { + Metric.incrementCountMap(statusCode, respStatuses); + } + + public void addResponseTime(long time) { + responseTimeHistogram.update(time); + } + + public void addResponseSize(long size) { + responseSizeHistogram.update(size); + } + + @Override + public void reset() { + respStatuses.clear(); + responseTimeHistogram.reset(); + responseSizeHistogram.reset(); + } + + @Override + public void update(Metric metric) { + HttpMetric clientMetric = (HttpMetric) metric; + Metric.updateCountMap(respStatuses, clientMetric.respStatuses); + responseTimeHistogram.update(clientMetric.responseTimeHistogram); + responseSizeHistogram.update(clientMetric.responseSizeHistogram); + } + +}
--- a/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlMetric.java Fri Apr 17 11:13:20 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlMetric.java Fri Apr 17 11:14:07 2020 +0200 @@ -1,7 +1,11 @@ package com.passus.st.client.pgsql; +import com.passus.commons.metric.Metric; import com.passus.st.client.DbMetric; import com.passus.st.utils.DbUtils; +import org.apache.commons.lang3.mutable.MutableInt; + +import java.util.HashMap; public class PgSqlMetric extends DbMetric { @@ -9,12 +13,32 @@ private final StringBuilder sb = new StringBuilder(); + private final HashMap<String, MutableInt> errorCodesCount = new HashMap<>(); + + public PgSqlMetric() { + this(DEFAULT_NAME); + } + public PgSqlMetric(String name) { super(name); + attrs.put("errorCodes", errorCodesCount); } - public PgSqlMetric() { - super(DEFAULT_NAME); + public void addErrorCode(String status) { + Metric.incrementCountMap(status, errorCodesCount); + } + + @Override + public void update(Metric metric) { + super.update(metric); + PgSqlMetric dbMetric = (PgSqlMetric) metric; + Metric.updateCountMap(errorCodesCount, dbMetric.errorCodesCount); + } + + @Override + public void reset() { + super.reset(); + errorCodesCount.clear(); } @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java Fri Apr 17 11:13:20 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java Fri Apr 17 11:14:07 2020 +0200 @@ -19,7 +19,7 @@ */ public class EmitterMetric implements Metric { - public static final String DEFAULT_NAME = "emitter"; + public static final String DEFAULT_NAME = "Emitter"; private final Map<String, Serializable> attrs = new LinkedHashMap<>();
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java Fri Apr 17 11:13:20 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java Fri Apr 17 11:14:07 2020 +0200 @@ -8,7 +8,6 @@ import com.passus.st.metric.MetricsContainer; import java.io.IOException; -import java.net.ConnectException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -123,8 +122,7 @@ logger.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress()); } - throw new ConnectException("Connection timed out."); - + throw new IOException("Connection timed out (" + connectionTimeout + "ms)."); } } catch (Exception e) { doCatchException(key, e);
--- a/stress-tester/src/main/java/com/passus/st/metric/MetricsBatchTimeWindow.java Fri Apr 17 11:13:20 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/metric/MetricsBatchTimeWindow.java Fri Apr 17 11:14:07 2020 +0200 @@ -1,11 +1,11 @@ package com.passus.st.metric; import com.passus.commons.metric.Metric; + import java.util.ArrayDeque; import java.util.List; /** - * * @author Mirosław Hawrot */ class MetricsBatchTimeWindow implements MetricsContainer { @@ -34,6 +34,12 @@ return period; } + @Override + public int size() { + return batch.size(); + } + + @Override public void clear() { batch.clear(); } @@ -186,8 +192,5 @@ return results; } - public int size() { - return batch.size(); - } }
--- a/stress-tester/src/main/java/com/passus/st/metric/MetricsContainer.java Fri Apr 17 11:13:20 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/metric/MetricsContainer.java Fri Apr 17 11:14:07 2020 +0200 @@ -11,6 +11,14 @@ return update(System.currentTimeMillis(), metric); } + default boolean isEmpty() { + return size() == 0; + } + + void clear(); + + int size(); + boolean update(long time, Metric metric); }
--- a/stress-tester/src/main/java/com/passus/st/utils/NetExceptionsCategory.java Fri Apr 17 11:13:20 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/utils/NetExceptionsCategory.java Fri Apr 17 11:14:07 2020 +0200 @@ -6,20 +6,19 @@ import java.net.SocketException; /** - * * @author Mirosław Hawrot */ public class NetExceptionsCategory { public static final String CONNECTION_REFUSED = "connection.refused"; - public static final String CONNECTION_TIMEDOUT = "connection.timedout"; + public static final String CONNECTION_TIMEOUT = "connection.timeout"; public static final String CONNECTION_NO_ROUTE_TO_HOST = "connection.no_route_to_host"; - public static final String CONNECTION_UNKNWON = "connection.unknwon"; + public static final String CONNECTION_UNKNOWN = "connection.unknown"; public static final String BIND_MAPPER_SESSION_INVALID = "bind.mapper_session_invalid"; public static final String BIND_NETWORK_UNREACHABLE = "bind.network_unreachable"; public static final String BIND_ADDRESS_ALREADY_IN_USE = "bind.address_already_in_use"; - public static final String BIND_UNKNWON = "bind.unknown"; + public static final String BIND_UNKNOWN = "bind.unknown"; public static final String UNKNOWN_ERROR = "unknown_error"; @@ -34,14 +33,14 @@ if (msg.contains("Address already in use")) { return BIND_ADDRESS_ALREADY_IN_USE; } else { - return BIND_UNKNWON; + return BIND_UNKNOWN; } } else if (cause instanceof ConnectException) { String msg = cause.getMessage(); if (msg.contains("Connection refused")) { return CONNECTION_REFUSED; } else if (msg.contains("Connection timed out")) { - return CONNECTION_TIMEDOUT; + return CONNECTION_TIMEOUT; } } else if (cause instanceof SocketException) { String msg = cause.getMessage();
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java Fri Apr 17 11:13:20 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java Fri Apr 17 11:14:07 2020 +0200 @@ -2,7 +2,6 @@ import com.passus.data.PooledByteBuffAllocator; import com.passus.net.PortRangeSet; -import com.passus.st.Log4jConfigurationFactory; import com.passus.st.Protocols; import com.passus.st.client.ArrayListEventHandler; import com.passus.st.client.Event;