Mercurial > stress-tester
changeset 431:bbc6b52ab089
Reporter - MetricsCollection
line wrap: on
line diff
--- a/stress-tester-reporter/src/main/avro/reporter.avdl Wed Jul 26 13:38:35 2017 +0200 +++ b/stress-tester-reporter/src/main/avro/reporter.avdl Thu Jul 27 12:58:49 2017 +0200 @@ -8,7 +8,7 @@ record MetricRecord { string code; - map<union { null, int, long, float, double, string, array<HttpHeader>, map<union {null, int, long, float, double, string}> }> fields; + map<union { null, int, long, float, double, string, array<HttpHeader>, map<union {null, int, long, float, double, string}>, MetricRecord }> fields; } record MetricsCollectionRecord {
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/MetricConverter.java Wed Jul 26 13:38:35 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/MetricConverter.java Thu Jul 27 12:58:49 2017 +0200 @@ -1,26 +1,83 @@ package com.passus.st.reporter; import com.passus.commons.metric.Metric; +import com.passus.commons.metric.MetricsCollection; import com.passus.st.reporter.protocol.MetricRecord; +import com.passus.st.reporter.protocol.MetricsCollectionRecord; import java.io.Serializable; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.mutable.MutableLong; /** * * @author mikolaj.podbielski */ public class MetricConverter { - - public MetricRecord convert(Metric metric) { + + private interface Converter<I extends Object> { + + Serializable convert(I input); + } + + private final Map<Class, Converter> converters = new HashMap<>(); + + { + converters.put(MutableLong.class, (Converter<MutableLong>) MutableLong::getValue); + converters.put(MutableInt.class, (Converter<MutableInt>) MutableInt::getValue); + } + + private Serializable convertIfNeeded(Object o) { + Converter conv = converters.get(o.getClass()); + return conv == null ? o.toString() : conv.convert(o); + } + + private HashMap processEntries(HashMap<Object, Object> map) { + try { + HashMap proc = map.getClass().newInstance(); + for (Map.Entry<Object, Object> e : map.entrySet()) { + Object key = convertIfNeeded(e.getKey()); + Object value = convertIfNeeded(e.getValue()); + proc.put(key, value); + } + return proc; + } catch (ReflectiveOperationException ex) { + throw new RuntimeException("Could not instantiate " + map.getClass()); + } + } + + public MetricRecord convert(Metric metric, boolean procEmbeddedMetric) { Map<CharSequence, Object> fields = new HashMap<>(); Set<String> names = metric.getAttributesName(); for (String name : names) { Serializable value = metric.getAttributeValue(name); - fields.put(name, value); + if (value != null) { + Converter converter = converters.get(value.getClass()); + if (converter != null) { + value = converter.convert(value); + } else if (procEmbeddedMetric && value instanceof Metric) { + value = convert(metric, false); + } else if (procEmbeddedMetric && value instanceof HashMap) { + value = processEntries((HashMap) value); + } + fields.put(name, value); + } } - + return new MetricRecord(metric.getName(), fields); } + + private final Set<String> allowed = new HashSet<>(Arrays.asList("emitter", "pcapSource")); + + public MetricsCollectionRecord convert(MetricsCollection collection) { + List<Metric> metrics = collection.getMetrics(); + List<MetricRecord> records = metrics.stream().filter(m -> allowed.contains(m.getName())).map((Metric m) -> convert(m, true)).collect(Collectors.toList()); + return new MetricsCollectionRecord(collection.getStartTimestamp(), collection.getEndTimestamp(), records); + } }
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/protocol/MetricRecord.java Wed Jul 26 13:38:35 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/protocol/MetricRecord.java Thu Jul 27 12:58:49 2017 +0200 @@ -13,8 +13,8 @@ @SuppressWarnings("all") @org.apache.avro.specific.AvroGenerated public class MetricRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - private static final long serialVersionUID = 5768471237763458493L; - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"MetricRecord\",\"namespace\":\"com.passus.st.reporter.protocol\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"fields\",\"type\":{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"HttpHeader\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"}]}},{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\"]}]}}]}"); + private static final long serialVersionUID = -4158323429014862621L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"MetricRecord\",\"namespace\":\"com.passus.st.reporter.protocol\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"fields\",\"type\":{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"HttpHeader\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"}]}},{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\"]},\"MetricRecord\"]}}]}"); public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } private static SpecificData MODEL$ = new SpecificData();
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/protocol/MetricsCollectionRecord.java Wed Jul 26 13:38:35 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/protocol/MetricsCollectionRecord.java Thu Jul 27 12:58:49 2017 +0200 @@ -13,8 +13,8 @@ @SuppressWarnings("all") @org.apache.avro.specific.AvroGenerated public class MetricsCollectionRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - private static final long serialVersionUID = 6439905265096882126L; - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"MetricsCollectionRecord\",\"namespace\":\"com.passus.st.reporter.protocol\",\"fields\":[{\"name\":\"startTimestamp\",\"type\":\"long\"},{\"name\":\"endTimestamp\",\"type\":\"long\"},{\"name\":\"metrics\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"MetricRecord\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"fields\",\"type\":{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"HttpHeader\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"}]}},{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\"]}]}}]}}}]}"); + private static final long serialVersionUID = 8919981058947259891L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"MetricsCollectionRecord\",\"namespace\":\"com.passus.st.reporter.protocol\",\"fields\":[{\"name\":\"startTimestamp\",\"type\":\"long\"},{\"name\":\"endTimestamp\",\"type\":\"long\"},{\"name\":\"metrics\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"MetricRecord\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"fields\",\"type\":{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"HttpHeader\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"}]}},{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\"]},\"MetricRecord\"]}}]}}}]}"); public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } private static SpecificData MODEL$ = new SpecificData();
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/protocol/Reporter.java Wed Jul 26 13:38:35 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/protocol/Reporter.java Thu Jul 27 12:58:49 2017 +0200 @@ -8,7 +8,7 @@ @SuppressWarnings("all") @org.apache.avro.specific.AvroGenerated public interface Reporter { - public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"Reporter\",\"namespace\":\"com.passus.st.reporter.protocol\",\"types\":[{\"type\":\"record\",\"name\":\"HttpHeader\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"MetricRecord\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"fields\",\"type\":{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\",{\"type\":\"array\",\"items\":\"HttpHeader\"},{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\"]}]}}]},{\"type\":\"record\",\"name\":\"MetricsCollectionRecord\",\"fields\":[{\"name\":\"startTimestamp\",\"type\":\"long\"},{\"name\":\"endTimestamp\",\"type\":\"long\"},{\"name\":\"metrics\",\"type\":{\"type\":\"array\",\"items\":\"MetricRecord\"}}]}],\"messages\":{\"handleMetric\":{\"request\":[{\"name\":\"metric\",\"type\":\"MetricRecord\"}],\"response\":\"string\"},\"handleMetricsCollection\":{\"request\":[{\"name\":\"collection\",\"type\":\"MetricsCollectionRecord\"}],\"response\":\"string\"}}}"); + public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"Reporter\",\"namespace\":\"com.passus.st.reporter.protocol\",\"types\":[{\"type\":\"record\",\"name\":\"HttpHeader\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"MetricRecord\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"fields\",\"type\":{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\",{\"type\":\"array\",\"items\":\"HttpHeader\"},{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\"]},\"MetricRecord\"]}}]},{\"type\":\"record\",\"name\":\"MetricsCollectionRecord\",\"fields\":[{\"name\":\"startTimestamp\",\"type\":\"long\"},{\"name\":\"endTimestamp\",\"type\":\"long\"},{\"name\":\"metrics\",\"type\":{\"type\":\"array\",\"items\":\"MetricRecord\"}}]}],\"messages\":{\"handleMetric\":{\"request\":[{\"name\":\"metric\",\"type\":\"MetricRecord\"}],\"response\":\"string\"},\"handleMetricsCollection\":{\"request\":[{\"name\":\"collection\",\"type\":\"MetricsCollectionRecord\"}],\"response\":\"string\"}}}"); /** */ java.lang.CharSequence handleMetric(com.passus.st.reporter.protocol.MetricRecord metric) throws org.apache.avro.AvroRemoteException;
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ReporterClient.java Wed Jul 26 13:38:35 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ReporterClient.java Thu Jul 27 12:58:49 2017 +0200 @@ -1,10 +1,12 @@ package com.passus.st.reporter.server; import com.passus.commons.metric.Metric; +import com.passus.commons.metric.MetricsCollection; import com.passus.commons.service.Service; import com.passus.commons.utils.SimpleThreadFactory; import com.passus.st.reporter.MetricConverter; import com.passus.st.reporter.protocol.MetricRecord; +import com.passus.st.reporter.protocol.MetricsCollectionRecord; import com.passus.st.reporter.protocol.Reporter; import java.io.IOException; import java.net.InetSocketAddress; @@ -27,7 +29,7 @@ private static final Logger LOGGER = LogManager.getLogger(ReporterClient.class); - private final BlockingQueue<Metric> queue = new ArrayBlockingQueue<>(4096); + private final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(4096); private final MetricConverter converter = new MetricConverter(); private final InetSocketAddress serverAddress; private volatile boolean working; @@ -37,9 +39,9 @@ this.serverAddress = serverAddress; } - public void send(Metric metric) { - if (!queue.offer(metric)) { - LOGGER.debug("Could not enqueue metric."); + public void send(Object object) { + if (!queue.offer(object)) { + LOGGER.debug("Could not enqueue message."); } } @@ -71,7 +73,7 @@ sender = null; } } - + public void waitForEmptyQueue() throws InterruptedException { while (queue.isEmpty() == false) { Thread.sleep(100); @@ -123,13 +125,29 @@ public void run() { while (working) { try { - Metric metric = queue.take(); - MetricRecord record = converter.convert(metric); + // TODO: refactor + Object metric = queue.take(); + Object record; + if (metric instanceof Metric) { + record = converter.convert((Metric) metric, false); + } else if (metric instanceof MetricsCollection) { + record = converter.convert((MetricsCollection) metric); + } else { + continue; + } + if (!isConnected()) { connect(); } if (isConnected()) { - CharSequence result = proxy.handleMetric(record); + CharSequence result; + if (record instanceof MetricRecord) { + result = proxy.handleMetric((MetricRecord) record); + } else if (record instanceof MetricsCollectionRecord) { + result = proxy.handleMetricsCollection((MetricsCollectionRecord) record); + } else { + continue; + } LOGGER.trace("result: {}", result); } @@ -139,6 +157,8 @@ LOGGER.warn("Could not send.", ex); } catch (IOException ex) { LOGGER.warn("Could not connect.", ex); + } catch (Exception ex) { + LOGGER.error(ex); } }
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ReporterImpl.java Wed Jul 26 13:38:35 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ReporterImpl.java Thu Jul 27 12:58:49 2017 +0200 @@ -5,6 +5,7 @@ import com.passus.st.reporter.protocol.Reporter; import java.io.IOException; import java.io.PrintWriter; +import java.util.List; import java.util.Map; import org.apache.avro.AvroRemoteException; import org.apache.avro.util.Utf8; @@ -17,23 +18,26 @@ private final PrintWriter reqFile; private final PrintWriter respFile; + private final PrintWriter emitterFile; private boolean verbose; public ReporterImpl() throws IOException { reqFile = new PrintWriter("requests.csv", "UTF-8"); respFile = new PrintWriter("responses.csv", "UTF-8"); + emitterFile = new PrintWriter("emitter.csv", "UTF-8"); } public void close() { reqFile.close(); respFile.close(); + emitterFile.close(); } public void setVerbose(boolean verbose) { this.verbose = verbose; } - private void addValue(StringBuilder builder, Object value) { + private static void addValue(StringBuilder builder, Object value) { if (value != null) { builder.append('"'); builder.append(value.toString()); @@ -42,22 +46,14 @@ builder.append(';'); } - private String getHeader(Map<CharSequence, CharSequence> headers, String header) { + private static String getMapValue(Map<CharSequence, CharSequence> map, String header) { CharSequence result = null; - if (headers != null) { - result = headers.get(new Utf8(header)); + if (map != null) { + result = map.get(new Utf8(header)); } return result == null ? null : result.toString(); } - private String getMisc(Map<CharSequence, CharSequence> misc, String miscKey) { - CharSequence result = null; - if (misc != null) { - result = misc.get(new Utf8(miscKey)); - } - return result == null ? null : result.toString(); - } - @Override public CharSequence handleMetric(MetricRecord metric) throws AvroRemoteException { StringBuilder builder = new StringBuilder(); @@ -80,10 +76,10 @@ addValue(builder, fields.get(new Utf8("messageHeaderSize"))); addValue(builder, fields.get(new Utf8("messageContentSize"))); Map<CharSequence, CharSequence> heads = (Map<CharSequence, CharSequence>) fields.get(new Utf8("headers")); - addValue(builder, getHeader(heads, "User-Agent")); + addValue(builder, getMapValue(heads, "User-Agent")); Map<CharSequence, CharSequence> misc = (Map<CharSequence, CharSequence>) fields.get(new Utf8("misc")); - addValue(builder, getHeader(misc, "sessionId")); - addValue(builder, getHeader(misc, "username")); + addValue(builder, getMapValue(misc, "sessionId")); + addValue(builder, getMapValue(misc, "username")); reqFile.println(builder.toString()); reqFile.flush(); System.out.println(builder.toString()); @@ -95,7 +91,7 @@ addValue(builder, fields.get(new Utf8("timeStart"))); addValue(builder, fields.get(new Utf8("timeStop"))); Map<CharSequence, CharSequence> heads = (Map<CharSequence, CharSequence>) fields.get(new Utf8("headers")); - addValue(builder, getHeader(heads, "Content-Type")); + addValue(builder, getMapValue(heads, "Content-Type")); // addValue(builder, fields.get(new Utf8("clientIp"))); // addValue(builder, fields.get(new Utf8("clientPort"))); // addValue(builder, fields.get(new Utf8("serverIp"))); @@ -120,7 +116,32 @@ @Override public CharSequence handleMetricsCollection(MetricsCollectionRecord collection) throws AvroRemoteException { + System.out.println(collection); + StringBuilder builder = new StringBuilder(); + + addValue(builder, collection.getStartTimestamp()); + addValue(builder, collection.getEndTimestamp()); + + MetricRecord emitterMetric = findMetric(collection, "emitter"); + if (emitterMetric != null) { + Map<CharSequence, Object> emitterFields = emitterMetric.getFields(); + addValue(builder, emitterFields.get(new Utf8("receivedBytes"))); + addValue(builder, emitterFields.get(new Utf8("sentBytes"))); + } + + emitterFile.println(builder.toString()); + emitterFile.flush(); + return "OK"; } + private static MetricRecord findMetric(MetricsCollectionRecord collection, String name) { + List<MetricRecord> metrics = collection.getMetrics(); + for (MetricRecord metric : metrics) { + if (metric.getCode().toString().equals(name)) { + return metric; + } + } + return null; + } }
--- a/stress-tester/src/main/java/com/passus/st/Main.java Wed Jul 26 13:38:35 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/Main.java Thu Jul 27 12:58:49 2017 +0200 @@ -12,6 +12,7 @@ import com.passus.st.client.http.DumperHttpClientListener; import com.passus.st.client.http.HttpClient; import com.passus.st.client.http.HttpReporterClientListener; +import com.passus.st.client.http.HttpReporterMetricHandler; import com.passus.st.client.http.SummaryHttpClientListener; import com.passus.st.client.http.WriterHttpClientListener; import com.passus.st.client.http.filter.HttpFiltersConfigurator; @@ -279,7 +280,7 @@ reporterClient.start(); client.addListener(new HttpReporterClientListener(reporterClient)); -// collector.addHandler(new HttpReporterMetricHandler(reporterClient)); + collector.addHandler(new HttpReporterMetricHandler(reporterClient)); } client.start();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpReporterMetricHandler.java Thu Jul 27 12:58:49 2017 +0200 @@ -0,0 +1,24 @@ +package com.passus.st.client.http; + +import com.passus.commons.metric.MetricsCollection; +import com.passus.st.metric.MetricsCollectionHandler; +import com.passus.st.reporter.server.ReporterClient; + +/** + * + * @author mikolaj.podbielski + */ +public class HttpReporterMetricHandler implements MetricsCollectionHandler { + + private final ReporterClient reporterClient; + + public HttpReporterMetricHandler(ReporterClient reporterClient) { + this.reporterClient = reporterClient; + } + + @Override + public void handle(MetricsCollection mc) { + reporterClient.send(mc); + } + +}