changeset 431:bbc6b52ab089

Reporter - MetricsCollection
author Devel 1
date Thu, 27 Jul 2017 12:58:49 +0200
parents dea663a83aaf
children 0db3ae52a2f3
files stress-tester-reporter/src/main/avro/reporter.avdl stress-tester-reporter/src/main/java/com/passus/st/reporter/MetricConverter.java stress-tester-reporter/src/main/java/com/passus/st/reporter/protocol/MetricRecord.java stress-tester-reporter/src/main/java/com/passus/st/reporter/protocol/MetricsCollectionRecord.java stress-tester-reporter/src/main/java/com/passus/st/reporter/protocol/Reporter.java stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ReporterClient.java stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ReporterImpl.java stress-tester/src/main/java/com/passus/st/Main.java stress-tester/src/main/java/com/passus/st/client/http/HttpReporterMetricHandler.java
diffstat 9 files changed, 158 insertions(+), 35 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester-reporter/src/main/avro/reporter.avdl	Wed Jul 26 13:38:35 2017 +0200
+++ b/stress-tester-reporter/src/main/avro/reporter.avdl	Thu Jul 27 12:58:49 2017 +0200
@@ -8,7 +8,7 @@
 
  record MetricRecord {
   string code;
-  map<union { null, int, long, float, double, string, array<HttpHeader>, map<union {null, int, long, float, double, string}> }> fields;
+  map<union { null, int, long, float, double, string, array<HttpHeader>, map<union {null, int, long, float, double, string}>, MetricRecord }> fields;
  }
 
  record MetricsCollectionRecord {
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/MetricConverter.java	Wed Jul 26 13:38:35 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/MetricConverter.java	Thu Jul 27 12:58:49 2017 +0200
@@ -1,26 +1,83 @@
 package com.passus.st.reporter;
 
 import com.passus.commons.metric.Metric;
+import com.passus.commons.metric.MetricsCollection;
 import com.passus.st.reporter.protocol.MetricRecord;
+import com.passus.st.reporter.protocol.MetricsCollectionRecord;
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.mutable.MutableLong;
 
 /**
  *
  * @author mikolaj.podbielski
  */
 public class MetricConverter {
-    
-    public MetricRecord convert(Metric metric) {
+
+    private interface Converter<I extends Object> {
+
+        Serializable convert(I input);
+    }
+
+    private final Map<Class, Converter> converters = new HashMap<>();
+
+    {
+        converters.put(MutableLong.class, (Converter<MutableLong>) MutableLong::getValue);
+        converters.put(MutableInt.class, (Converter<MutableInt>) MutableInt::getValue);
+    }
+
+    private Serializable convertIfNeeded(Object o) {
+        Converter conv = converters.get(o.getClass());
+        return conv == null ? o.toString() : conv.convert(o);
+    }
+
+    private HashMap processEntries(HashMap<Object, Object> map) {
+        try {
+            HashMap proc = map.getClass().newInstance();
+            for (Map.Entry<Object, Object> e : map.entrySet()) {
+                Object key = convertIfNeeded(e.getKey());
+                Object value = convertIfNeeded(e.getValue());
+                proc.put(key, value);
+            }
+            return proc;
+        } catch (ReflectiveOperationException ex) {
+            throw new RuntimeException("Could not instantiate " + map.getClass());
+        }
+    }
+
+    public MetricRecord convert(Metric metric, boolean procEmbeddedMetric) {
         Map<CharSequence, Object> fields = new HashMap<>();
         Set<String> names = metric.getAttributesName();
         for (String name : names) {
             Serializable value = metric.getAttributeValue(name);
-            fields.put(name, value);
+            if (value != null) {
+                Converter converter = converters.get(value.getClass());
+                if (converter != null) {
+                    value = converter.convert(value);
+                } else if (procEmbeddedMetric && value instanceof Metric) {
+                    value = convert(metric, false);
+                } else if (procEmbeddedMetric && value instanceof HashMap) {
+                    value = processEntries((HashMap) value);
+                }
+                fields.put(name, value);
+            }
         }
-        
+
         return new MetricRecord(metric.getName(), fields);
     }
+
+    private final Set<String> allowed = new HashSet<>(Arrays.asList("emitter", "pcapSource"));
+
+    public MetricsCollectionRecord convert(MetricsCollection collection) {
+        List<Metric> metrics = collection.getMetrics();
+        List<MetricRecord> records = metrics.stream().filter(m -> allowed.contains(m.getName())).map((Metric m) -> convert(m, true)).collect(Collectors.toList());
+        return new MetricsCollectionRecord(collection.getStartTimestamp(), collection.getEndTimestamp(), records);
+    }
 }
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/protocol/MetricRecord.java	Wed Jul 26 13:38:35 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/protocol/MetricRecord.java	Thu Jul 27 12:58:49 2017 +0200
@@ -13,8 +13,8 @@
 @SuppressWarnings("all")
 @org.apache.avro.specific.AvroGenerated
 public class MetricRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
-  private static final long serialVersionUID = 5768471237763458493L;
-  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"MetricRecord\",\"namespace\":\"com.passus.st.reporter.protocol\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"fields\",\"type\":{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"HttpHeader\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"}]}},{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\"]}]}}]}");
+  private static final long serialVersionUID = -4158323429014862621L;
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"MetricRecord\",\"namespace\":\"com.passus.st.reporter.protocol\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"fields\",\"type\":{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"HttpHeader\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"}]}},{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\"]},\"MetricRecord\"]}}]}");
   public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
 
   private static SpecificData MODEL$ = new SpecificData();
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/protocol/MetricsCollectionRecord.java	Wed Jul 26 13:38:35 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/protocol/MetricsCollectionRecord.java	Thu Jul 27 12:58:49 2017 +0200
@@ -13,8 +13,8 @@
 @SuppressWarnings("all")
 @org.apache.avro.specific.AvroGenerated
 public class MetricsCollectionRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
-  private static final long serialVersionUID = 6439905265096882126L;
-  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"MetricsCollectionRecord\",\"namespace\":\"com.passus.st.reporter.protocol\",\"fields\":[{\"name\":\"startTimestamp\",\"type\":\"long\"},{\"name\":\"endTimestamp\",\"type\":\"long\"},{\"name\":\"metrics\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"MetricRecord\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"fields\",\"type\":{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"HttpHeader\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"}]}},{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\"]}]}}]}}}]}");
+  private static final long serialVersionUID = 8919981058947259891L;
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"MetricsCollectionRecord\",\"namespace\":\"com.passus.st.reporter.protocol\",\"fields\":[{\"name\":\"startTimestamp\",\"type\":\"long\"},{\"name\":\"endTimestamp\",\"type\":\"long\"},{\"name\":\"metrics\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"MetricRecord\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"fields\",\"type\":{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"HttpHeader\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"}]}},{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\"]},\"MetricRecord\"]}}]}}}]}");
   public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
 
   private static SpecificData MODEL$ = new SpecificData();
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/protocol/Reporter.java	Wed Jul 26 13:38:35 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/protocol/Reporter.java	Thu Jul 27 12:58:49 2017 +0200
@@ -8,7 +8,7 @@
 @SuppressWarnings("all")
 @org.apache.avro.specific.AvroGenerated
 public interface Reporter {
-  public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"Reporter\",\"namespace\":\"com.passus.st.reporter.protocol\",\"types\":[{\"type\":\"record\",\"name\":\"HttpHeader\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"MetricRecord\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"fields\",\"type\":{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\",{\"type\":\"array\",\"items\":\"HttpHeader\"},{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\"]}]}}]},{\"type\":\"record\",\"name\":\"MetricsCollectionRecord\",\"fields\":[{\"name\":\"startTimestamp\",\"type\":\"long\"},{\"name\":\"endTimestamp\",\"type\":\"long\"},{\"name\":\"metrics\",\"type\":{\"type\":\"array\",\"items\":\"MetricRecord\"}}]}],\"messages\":{\"handleMetric\":{\"request\":[{\"name\":\"metric\",\"type\":\"MetricRecord\"}],\"response\":\"string\"},\"handleMetricsCollection\":{\"request\":[{\"name\":\"collection\",\"type\":\"MetricsCollectionRecord\"}],\"response\":\"string\"}}}");
+  public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"Reporter\",\"namespace\":\"com.passus.st.reporter.protocol\",\"types\":[{\"type\":\"record\",\"name\":\"HttpHeader\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"MetricRecord\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"fields\",\"type\":{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\",{\"type\":\"array\",\"items\":\"HttpHeader\"},{\"type\":\"map\",\"values\":[\"null\",\"int\",\"long\",\"float\",\"double\",\"string\"]},\"MetricRecord\"]}}]},{\"type\":\"record\",\"name\":\"MetricsCollectionRecord\",\"fields\":[{\"name\":\"startTimestamp\",\"type\":\"long\"},{\"name\":\"endTimestamp\",\"type\":\"long\"},{\"name\":\"metrics\",\"type\":{\"type\":\"array\",\"items\":\"MetricRecord\"}}]}],\"messages\":{\"handleMetric\":{\"request\":[{\"name\":\"metric\",\"type\":\"MetricRecord\"}],\"response\":\"string\"},\"handleMetricsCollection\":{\"request\":[{\"name\":\"collection\",\"type\":\"MetricsCollectionRecord\"}],\"response\":\"string\"}}}");
   /**
    */
   java.lang.CharSequence handleMetric(com.passus.st.reporter.protocol.MetricRecord metric) throws org.apache.avro.AvroRemoteException;
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ReporterClient.java	Wed Jul 26 13:38:35 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ReporterClient.java	Thu Jul 27 12:58:49 2017 +0200
@@ -1,10 +1,12 @@
 package com.passus.st.reporter.server;
 
 import com.passus.commons.metric.Metric;
+import com.passus.commons.metric.MetricsCollection;
 import com.passus.commons.service.Service;
 import com.passus.commons.utils.SimpleThreadFactory;
 import com.passus.st.reporter.MetricConverter;
 import com.passus.st.reporter.protocol.MetricRecord;
+import com.passus.st.reporter.protocol.MetricsCollectionRecord;
 import com.passus.st.reporter.protocol.Reporter;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -27,7 +29,7 @@
 
     private static final Logger LOGGER = LogManager.getLogger(ReporterClient.class);
 
-    private final BlockingQueue<Metric> queue = new ArrayBlockingQueue<>(4096);
+    private final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(4096);
     private final MetricConverter converter = new MetricConverter();
     private final InetSocketAddress serverAddress;
     private volatile boolean working;
@@ -37,9 +39,9 @@
         this.serverAddress = serverAddress;
     }
 
-    public void send(Metric metric) {
-        if (!queue.offer(metric)) {
-            LOGGER.debug("Could not enqueue metric.");
+    public void send(Object object) {
+        if (!queue.offer(object)) {
+            LOGGER.debug("Could not enqueue message.");
         }
     }
 
@@ -71,7 +73,7 @@
             sender = null;
         }
     }
-    
+
     public void waitForEmptyQueue() throws InterruptedException {
         while (queue.isEmpty() == false) {
             Thread.sleep(100);
@@ -123,13 +125,29 @@
         public void run() {
             while (working) {
                 try {
-                    Metric metric = queue.take();
-                    MetricRecord record = converter.convert(metric);
+                    // TODO: refactor
+                    Object metric = queue.take();
+                    Object record;
+                    if (metric instanceof Metric) {
+                        record = converter.convert((Metric) metric, false);
+                    } else if (metric instanceof MetricsCollection) {
+                        record = converter.convert((MetricsCollection) metric);
+                    } else {
+                        continue;
+                    }
+
                     if (!isConnected()) {
                         connect();
                     }
                     if (isConnected()) {
-                        CharSequence result = proxy.handleMetric(record);
+                        CharSequence result;
+                        if (record instanceof MetricRecord) {
+                            result = proxy.handleMetric((MetricRecord) record);
+                        } else if (record instanceof MetricsCollectionRecord) {
+                            result = proxy.handleMetricsCollection((MetricsCollectionRecord) record);
+                        } else {
+                            continue;
+                        }
                         LOGGER.trace("result: {}", result);
                     }
 
@@ -139,6 +157,8 @@
                     LOGGER.warn("Could not send.", ex);
                 } catch (IOException ex) {
                     LOGGER.warn("Could not connect.", ex);
+                } catch (Exception ex) {
+                    LOGGER.error(ex);
                 }
             }
 
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ReporterImpl.java	Wed Jul 26 13:38:35 2017 +0200
+++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ReporterImpl.java	Thu Jul 27 12:58:49 2017 +0200
@@ -5,6 +5,7 @@
 import com.passus.st.reporter.protocol.Reporter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.List;
 import java.util.Map;
 import org.apache.avro.AvroRemoteException;
 import org.apache.avro.util.Utf8;
@@ -17,23 +18,26 @@
 
     private final PrintWriter reqFile;
     private final PrintWriter respFile;
+    private final PrintWriter emitterFile;
     private boolean verbose;
 
     public ReporterImpl() throws IOException {
         reqFile = new PrintWriter("requests.csv", "UTF-8");
         respFile = new PrintWriter("responses.csv", "UTF-8");
+        emitterFile = new PrintWriter("emitter.csv", "UTF-8");
     }
 
     public void close() {
         reqFile.close();
         respFile.close();
+        emitterFile.close();
     }
 
     public void setVerbose(boolean verbose) {
         this.verbose = verbose;
     }
 
-    private void addValue(StringBuilder builder, Object value) {
+    private static void addValue(StringBuilder builder, Object value) {
         if (value != null) {
             builder.append('"');
             builder.append(value.toString());
@@ -42,22 +46,14 @@
         builder.append(';');
     }
 
-    private String getHeader(Map<CharSequence, CharSequence> headers, String header) {
+    private static String getMapValue(Map<CharSequence, CharSequence> map, String header) {
         CharSequence result = null;
-        if (headers != null) {
-            result = headers.get(new Utf8(header));
+        if (map != null) {
+            result = map.get(new Utf8(header));
         }
         return result == null ? null : result.toString();
     }
 
-    private String getMisc(Map<CharSequence, CharSequence> misc, String miscKey) {
-        CharSequence result = null;
-        if (misc != null) {
-            result = misc.get(new Utf8(miscKey));
-        }
-        return result == null ? null : result.toString();
-    }
-     
     @Override
     public CharSequence handleMetric(MetricRecord metric) throws AvroRemoteException {
         StringBuilder builder = new StringBuilder();
@@ -80,10 +76,10 @@
             addValue(builder, fields.get(new Utf8("messageHeaderSize")));
             addValue(builder, fields.get(new Utf8("messageContentSize")));
             Map<CharSequence, CharSequence> heads = (Map<CharSequence, CharSequence>) fields.get(new Utf8("headers"));
-            addValue(builder, getHeader(heads, "User-Agent"));
+            addValue(builder, getMapValue(heads, "User-Agent"));
             Map<CharSequence, CharSequence> misc = (Map<CharSequence, CharSequence>) fields.get(new Utf8("misc"));
-            addValue(builder, getHeader(misc, "sessionId"));
-            addValue(builder, getHeader(misc, "username"));
+            addValue(builder, getMapValue(misc, "sessionId"));
+            addValue(builder, getMapValue(misc, "username"));
             reqFile.println(builder.toString());
             reqFile.flush();
             System.out.println(builder.toString());
@@ -95,7 +91,7 @@
             addValue(builder, fields.get(new Utf8("timeStart")));
             addValue(builder, fields.get(new Utf8("timeStop")));
             Map<CharSequence, CharSequence> heads = (Map<CharSequence, CharSequence>) fields.get(new Utf8("headers"));
-            addValue(builder, getHeader(heads, "Content-Type"));
+            addValue(builder, getMapValue(heads, "Content-Type"));
 //            addValue(builder, fields.get(new Utf8("clientIp")));
 //            addValue(builder, fields.get(new Utf8("clientPort")));
 //            addValue(builder, fields.get(new Utf8("serverIp")));
@@ -120,7 +116,32 @@
 
     @Override
     public CharSequence handleMetricsCollection(MetricsCollectionRecord collection) throws AvroRemoteException {
+        System.out.println(collection);
+        StringBuilder builder = new StringBuilder();
+
+        addValue(builder, collection.getStartTimestamp());
+        addValue(builder, collection.getEndTimestamp());
+
+        MetricRecord emitterMetric = findMetric(collection, "emitter");
+        if (emitterMetric != null) {
+            Map<CharSequence, Object> emitterFields = emitterMetric.getFields();
+            addValue(builder, emitterFields.get(new Utf8("receivedBytes")));
+            addValue(builder, emitterFields.get(new Utf8("sentBytes")));
+        }
+
+        emitterFile.println(builder.toString());
+        emitterFile.flush();
+
         return "OK";
     }
 
+    private static MetricRecord findMetric(MetricsCollectionRecord collection, String name) {
+        List<MetricRecord> metrics = collection.getMetrics();
+        for (MetricRecord metric : metrics) {
+            if (metric.getCode().toString().equals(name)) {
+                return metric;
+            }
+        }
+        return null;
+    }
 }
--- a/stress-tester/src/main/java/com/passus/st/Main.java	Wed Jul 26 13:38:35 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/Main.java	Thu Jul 27 12:58:49 2017 +0200
@@ -12,6 +12,7 @@
 import com.passus.st.client.http.DumperHttpClientListener;
 import com.passus.st.client.http.HttpClient;
 import com.passus.st.client.http.HttpReporterClientListener;
+import com.passus.st.client.http.HttpReporterMetricHandler;
 import com.passus.st.client.http.SummaryHttpClientListener;
 import com.passus.st.client.http.WriterHttpClientListener;
 import com.passus.st.client.http.filter.HttpFiltersConfigurator;
@@ -279,7 +280,7 @@
                 reporterClient.start();
 
                 client.addListener(new HttpReporterClientListener(reporterClient));
-//                collector.addHandler(new HttpReporterMetricHandler(reporterClient));
+                collector.addHandler(new HttpReporterMetricHandler(reporterClient));
             }
 
             client.start();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpReporterMetricHandler.java	Thu Jul 27 12:58:49 2017 +0200
@@ -0,0 +1,24 @@
+package com.passus.st.client.http;
+
+import com.passus.commons.metric.MetricsCollection;
+import com.passus.st.metric.MetricsCollectionHandler;
+import com.passus.st.reporter.server.ReporterClient;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class HttpReporterMetricHandler implements MetricsCollectionHandler {
+
+    private final ReporterClient reporterClient;
+
+    public HttpReporterMetricHandler(ReporterClient reporterClient) {
+        this.reporterClient = reporterClient;
+    }
+
+    @Override
+    public void handle(MetricsCollection mc) {
+        reporterClient.send(mc);
+    }
+
+}