changeset 1058:8535ce392b5e

AbstractFlowHandler enhanced
author Devel 2
date Thu, 16 Apr 2020 09:41:15 +0200
parents b3d44dd719f1
children 1d47e67bad78
files stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandlerDataDecoder.java stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandler.java stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java
diffstat 5 files changed, 52 insertions(+), 88 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java	Thu Apr 16 09:20:53 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java	Thu Apr 16 09:41:15 2020 +0200
@@ -3,12 +3,30 @@
 import com.passus.commons.metric.Metric;
 import com.passus.st.metric.MetricsContainer;
 
-public abstract class AbstractFlowHandler<T extends Metric> implements FlowHandler {
+public abstract class AbstractFlowHandler<T extends Metric, R, S> implements FlowHandler {
+
+    private final FlowHandlerDataEncoder<R> encoder;
+    private final FlowHandlerDataDecoder<S> decoder;
 
     protected boolean collectMetrics = false;
 
     protected T metric;
 
+    public AbstractFlowHandler(FlowHandlerDataEncoder<R> encoder, FlowHandlerDataDecoder<S> decoder) {
+        this.encoder = encoder;
+        this.decoder = decoder;
+    }
+
+    @Override
+    public FlowHandlerDataDecoder<S> getResponseDecoder(FlowContext flowContext) {
+        return decoder;
+    }
+
+    @Override
+    public FlowHandlerDataEncoder<R> getRequestEncoder(FlowContext flowContext) {
+        return encoder;
+    }
+
     protected abstract T createMetric();
 
     @Override
@@ -41,5 +59,20 @@
             }
         }
     }
-    
+
+    @Override
+    public void onRequestSent(Object request, FlowContext flowContext) {
+        onRequestSent0((R) request, flowContext);
+    }
+
+    protected void onRequestSent0(R request, FlowContext flowContext) {
+    }
+
+    @Override
+    public void onResponseReceived(Object response, FlowContext flowContext) {
+        onResponseReceived0((S) response, flowContext);
+    }
+
+    protected void onResponseReceived0(S response, FlowContext flowContext) {
+    }
 }
--- a/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java	Thu Apr 16 09:20:53 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java	Thu Apr 16 09:41:15 2020 +0200
@@ -7,16 +7,10 @@
 import com.passus.net.dns.DnsQuery;
 import com.passus.st.client.AbstractFlowHandler;
 import com.passus.st.client.FlowContext;
-import com.passus.st.client.FlowHandlerDataDecoder;
-import com.passus.st.client.FlowHandlerDataEncoder;
 
 import static com.passus.st.Protocols.DNS;
 
-public class DnsFlowHandler extends AbstractFlowHandler<DnsMetric> implements TimeAware {
-
-    private final DnsFlowHandlerDataDecoder decoder;
-
-    private final DnsFlowHandlerDataEncoder encoder;
+public class DnsFlowHandler extends AbstractFlowHandler<DnsMetric, Dns, Dns> implements TimeAware {
 
     TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
 
@@ -25,8 +19,7 @@
     DnsMetric metric;
 
     public DnsFlowHandler() {
-        this.decoder = new DnsFlowHandlerDataDecoder(this);
-        this.encoder = new DnsFlowHandlerDataEncoder();
+        super(new DnsFlowHandlerDataEncoder(), new DnsFlowHandlerDataDecoder());
     }
 
     @Override
@@ -51,20 +44,9 @@
     }
 
     @Override
-    public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) {
-        return decoder;
-    }
-
-    @Override
-    public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) {
-        return encoder;
-    }
-
-    @Override
-    public void onRequestSent(Object request, FlowContext flowContext) {
+    protected void onRequestSent0(Dns dns, FlowContext flowContext) {
         if (collectMetrics) {
             synchronized (metric) {
-                Dns dns = (Dns) request;
                 if (dns.getQueriesCount() > 0) {
                     DnsQuery query = dns.getQueries().get(0);
                     metric.addSentRecordType(query.getType());
@@ -74,10 +56,9 @@
     }
 
     @Override
-    public void onResponseReceived(Object response, FlowContext flowContext) {
+    protected void onResponseReceived0(Dns dns, FlowContext flowContext) {
         if (collectMetrics) {
             synchronized (metric) {
-                Dns dns = (Dns) response;
                 metric.addReplyCode(dns.getReplyCode());
                 if (dns.getQueriesCount() > 0) {
                     dns.getAnswers().forEach((r) -> {
--- a/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandlerDataDecoder.java	Thu Apr 16 09:20:53 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandlerDataDecoder.java	Thu Apr 16 09:41:15 2020 +0200
@@ -16,13 +16,7 @@
     private static final Logger LOGGER = LogManager.getLogger(DnsFlowHandlerDataDecoder.class);
 
     private DnsDecoder decoder = new DnsDecoder();
-
-    private final DnsFlowHandler handler;
-
-    public DnsFlowHandlerDataDecoder(DnsFlowHandler handler) {
-        this.handler = handler;
-    }
-
+    
     @Override
     public Dns getResult() {
         return decoder.getResult();
@@ -42,7 +36,7 @@
     public int decode(ByteBuff buffer, FlowContext flowContext) {
         int res = decoder.decode(buffer);
         if (decoder.state() == DataDecoder.STATE_FINISHED) {
-            long now = handler.timeGenerator.currentTimeMillis();
+            long now = System.currentTimeMillis();
 
             if (LOGGER.isDebugEnabled()) {
                 debug(LOGGER, flowContext,
@@ -51,19 +45,6 @@
                         now - flowContext.receivedStartTimestamp()
                 );
             }
-
-            //TODO Dorobic metryki
-            /*if (client.isCollectMetrics()) {
-                synchronized (client.metric) {
-                    client.metric.incResponsesNum();
-                    client.metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize());
-                    client.metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp());
-                    if (lastRequest != null) {
-                        client.metric.addResponseTime(now - (long) lastRequest.getTag(TAG_TIME_START));
-                    }
-                }
-            }*/
-
         }
 
         return res;
--- a/stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandler.java	Thu Apr 16 09:20:53 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandler.java	Thu Apr 16 09:41:15 2020 +0200
@@ -6,19 +6,15 @@
 import com.passus.net.netflow.Netflow;
 import com.passus.st.client.AbstractFlowHandler;
 import com.passus.st.client.FlowContext;
-import com.passus.st.client.FlowHandlerDataDecoder;
-import com.passus.st.client.FlowHandlerDataEncoder;
 
 import static com.passus.st.Protocols.NETFLOW;
 
-public class NetflowFlowHandler extends AbstractFlowHandler<NetflowMetric> implements TimeAware {
-
-    private final NetflowFlowHandlerDataEncoder encoder;
+public class NetflowFlowHandler extends AbstractFlowHandler<NetflowMetric, Netflow, Void> implements TimeAware {
 
     TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
 
     public NetflowFlowHandler() {
-        this.encoder = new NetflowFlowHandlerDataEncoder();
+        super(new NetflowFlowHandlerDataEncoder(), null);
     }
 
     @Override
@@ -32,16 +28,6 @@
     }
 
     @Override
-    public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) {
-        return null;
-    }
-
-    @Override
-    public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) {
-        return encoder;
-    }
-
-    @Override
     public TimeGenerator getTimeGenerator() {
         return timeGenerator;
     }
@@ -58,9 +44,8 @@
     }
 
     @Override
-    public void onRequestSent(Object request, FlowContext flowContext) {
+    protected void onRequestSent0(Netflow netflow, FlowContext flowContext) {
         if (collectMetrics) {
-            Netflow netflow = (Netflow) request;
             synchronized (metric) {
                 metric.addNetflow(netflow);
             }
--- a/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java	Thu Apr 16 09:20:53 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java	Thu Apr 16 09:41:15 2020 +0200
@@ -8,25 +8,15 @@
 import com.passus.net.pgsql.PgSqlSimpleQueryMessage;
 import com.passus.st.client.AbstractFlowHandler;
 import com.passus.st.client.FlowContext;
-import com.passus.st.client.FlowHandlerDataDecoder;
-import com.passus.st.client.FlowHandlerDataEncoder;
 
 import static com.passus.st.Protocols.NETFLOW;
 
-public class PgSqlFlowHandlerNetflowFlowHandler extends AbstractFlowHandler<PgSqlMetric> implements TimeAware {
-
-    private final PgSqlFlowHandlerDataEncoder encoder;
-    private final PgSqlFlowHandlerDataDecoder decoder;
+public class PgSqlFlowHandlerNetflowFlowHandler extends AbstractFlowHandler<PgSqlMetric, PgSqlMessage, PgSqlMessage> implements TimeAware {
 
     TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
 
-    boolean collectMetrics = false;
-
-    private PgSqlMetric metric;
-
     public PgSqlFlowHandlerNetflowFlowHandler() {
-        this.encoder = new PgSqlFlowHandlerDataEncoder();
-        this.decoder = new PgSqlFlowHandlerDataDecoder();
+        super(new PgSqlFlowHandlerDataEncoder(), new PgSqlFlowHandlerDataDecoder());
     }
 
     @Override
@@ -40,18 +30,7 @@
     }
 
     @Override
-    public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) {
-        return decoder;
-    }
-
-    @Override
-    public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) {
-        return encoder;
-    }
-
-    @Override
-    public void onRequestSent(Object request, FlowContext flowContext) {
-        PgSqlMessage msg = (PgSqlMessage) request;
+    protected void onRequestSent0(PgSqlMessage msg, FlowContext flowContext) {
         if (collectMetrics) {
             if (msg.getType() == PgSqlMessageType.SIMPLE_QUERY) {
                 PgSqlSimpleQueryMessage simpleQueryMsg = (PgSqlSimpleQueryMessage) msg;
@@ -63,6 +42,11 @@
     }
 
     @Override
+    protected void onResponseReceived0(PgSqlMessage msg, FlowContext flowContext) {
+        //flowContext.channelContext().close();
+    }
+
+    @Override
     public TimeGenerator getTimeGenerator() {
         return timeGenerator;
     }