changeset 1055:2f945e8445a0

AbstractFlowHandler
author Devel 2
date Wed, 15 Apr 2020 11:46:21 +0200
parents a8dda9af38cf
children c0ada9453f53
files stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java
diffstat 3 files changed, 85 insertions(+), 59 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java	Wed Apr 15 11:46:21 2020 +0200
@@ -0,0 +1,49 @@
+package com.passus.st.client;
+
+import com.passus.commons.metric.Metric;
+import com.passus.st.metric.MetricsContainer;
+
+public abstract class AbstractFlowHandler<T extends Metric> implements FlowHandler {
+
+    protected boolean collectMetrics = false;
+
+    protected T metric;
+
+    protected abstract T createMetric();
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        this.collectMetrics = collectMetrics;
+        if (collectMetrics) {
+            metric = createMetric();
+            synchronized (metric) {
+                metric.activate();
+            }
+        } else {
+            synchronized (metric) {
+                metric.deactivate();
+            }
+            metric = null;
+        }
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+        if (collectMetrics) {
+            synchronized (metric) {
+                container.update(metric);
+                metric.reset();
+            }
+        }
+    }
+
+    @Override
+    public void init(FlowContext flowContext) {
+
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java	Wed Apr 15 11:08:13 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java	Wed Apr 15 11:46:21 2020 +0200
@@ -5,15 +5,14 @@
 import com.passus.commons.time.TimeGenerator;
 import com.passus.net.dns.Dns;
 import com.passus.net.dns.DnsQuery;
+import com.passus.st.client.AbstractFlowHandler;
 import com.passus.st.client.FlowContext;
-import com.passus.st.client.FlowHandler;
 import com.passus.st.client.FlowHandlerDataDecoder;
 import com.passus.st.client.FlowHandlerDataEncoder;
-import com.passus.st.metric.MetricsContainer;
 
 import static com.passus.st.Protocols.DNS;
 
-public class DnsFlowHandler implements FlowHandler, TimeAware {
+public class DnsFlowHandler extends AbstractFlowHandler<DnsMetric> implements TimeAware {
 
     private final DnsFlowHandlerDataDecoder decoder;
 
@@ -36,44 +35,8 @@
     }
 
     @Override
-    public boolean isCollectMetrics() {
-        return collectMetrics;
-    }
-
-    @Override
-    public void setCollectMetrics(boolean collectMetrics) {
-        this.collectMetrics = collectMetrics;
-        if (collectMetrics) {
-            metric = new DnsMetric();
-            synchronized (metric) {
-                metric.activate();
-            }
-        } else {
-            synchronized (metric) {
-                metric.deactivate();
-            }
-            metric = null;
-        }
-    }
-
-    @Override
-    public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) {
-        return decoder;
-    }
-
-    @Override
-    public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) {
-        return encoder;
-    }
-
-    @Override
-    public void writeMetrics(MetricsContainer container) {
-        if (collectMetrics) {
-            synchronized (metric) {
-                container.update(metric);
-                metric.reset();
-            }
-        }
+    protected DnsMetric createMetric() {
+        return new DnsMetric();
     }
 
     @Override
@@ -88,6 +51,16 @@
     }
 
     @Override
+    public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) {
+        return decoder;
+    }
+
+    @Override
+    public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) {
+        return encoder;
+    }
+
+    @Override
     public void onRequestSent(Object request, FlowContext flowContext) {
         if (collectMetrics) {
             synchronized (metric) {
@@ -115,10 +88,4 @@
         }
     }
 
-    @Override
-    public void init(FlowContext flowContext) {
-
-    }
-
-
 }
--- a/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java	Wed Apr 15 11:08:13 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java	Wed Apr 15 11:46:21 2020 +0200
@@ -3,12 +3,17 @@
 import com.passus.commons.Assert;
 import com.passus.commons.time.TimeAware;
 import com.passus.commons.time.TimeGenerator;
-import com.passus.st.client.*;
-import com.passus.st.metric.MetricsContainer;
+import com.passus.net.pgsql.PgSqlMessage;
+import com.passus.net.pgsql.PgSqlMessageType;
+import com.passus.net.pgsql.PgSqlSimpleQueryMessage;
+import com.passus.st.client.AbstractFlowHandler;
+import com.passus.st.client.FlowContext;
+import com.passus.st.client.FlowHandlerDataDecoder;
+import com.passus.st.client.FlowHandlerDataEncoder;
 
 import static com.passus.st.Protocols.NETFLOW;
 
-public class PgSqlFlowHandlerNetflowFlowHandler implements FlowHandler, TimeAware {
+public class PgSqlFlowHandlerNetflowFlowHandler extends AbstractFlowHandler<PgSqlMetric> implements TimeAware {
 
     private final PgSqlFlowHandlerDataEncoder encoder;
 
@@ -16,6 +21,8 @@
 
     boolean collectMetrics = false;
 
+    private PgSqlMetric metric;
+
     public PgSqlFlowHandlerNetflowFlowHandler() {
         this.encoder = new PgSqlFlowHandlerDataEncoder();
     }
@@ -26,13 +33,8 @@
     }
 
     @Override
-    public boolean isCollectMetrics() {
-        return collectMetrics;
-    }
-
-    @Override
-    public void setCollectMetrics(boolean collectMetrics) {
-        this.collectMetrics = collectMetrics;
+    protected PgSqlMetric createMetric() {
+        return new PgSqlMetric();
     }
 
     @Override
@@ -46,8 +48,16 @@
     }
 
     @Override
-    public void writeMetrics(MetricsContainer container) {
-
+    public void onRequestSent(Object request, FlowContext flowContext) {
+        PgSqlMessage msg = (PgSqlMessage) request;
+        if (collectMetrics) {
+            if (msg.getType() == PgSqlMessageType.SIMPLE_QUERY) {
+                PgSqlSimpleQueryMessage simpleQueryMsg = (PgSqlSimpleQueryMessage) msg;
+                synchronized (metric) {
+                    metric.addQuery(simpleQueryMsg.getQuery());
+                }
+            }
+        }
     }
 
     @Override