changeset 1098:0724cf025128

PacketsBulk added
author Devel 2
date Tue, 12 May 2020 10:02:17 +0200
parents 29ccfdd52055
children b89bae260cb5
files stress-tester/src/main/java/com/passus/st/PacketsBulk.java stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataDecoder.java stress-tester/src/main/java/com/passus/st/client/mysql/MySqlResponsePacketsDecoder.java stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlFilter.java stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java stress-tester/src/test/java/com/passus/st/client/mysql/MySqlResponsePacketsDecoderTest.java
diffstat 7 files changed, 59 insertions(+), 42 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/PacketsBulk.java	Tue May 12 10:02:17 2020 +0200
@@ -0,0 +1,17 @@
+package com.passus.st;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PacketsBulk<T> {
+
+    public final int protocol;
+
+    public final List<T> packets;
+
+    public PacketsBulk(int protocol) {
+        this.protocol = protocol;
+        this.packets = new ArrayList<>();
+    }
+    
+}
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java	Tue May 12 09:33:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java	Tue May 12 10:02:17 2020 +0200
@@ -4,6 +4,7 @@
 import com.passus.commons.time.TimeAware;
 import com.passus.commons.time.TimeGenerator;
 import com.passus.net.mysql.*;
+import com.passus.st.PacketsBulk;
 import com.passus.st.client.*;
 import com.passus.st.client.pgsql.PgSqlFlowHandler;
 import org.apache.logging.log4j.LogManager;
@@ -14,7 +15,7 @@
 import static com.passus.net.mysql.MySqlPacketTypes.ROW_RESPONSE;
 import static com.passus.st.Protocols.NETFLOW;
 
-public final class MySqlFlowHandler extends AbstractFlowHandler<MySqlMetric, MySqlPacket, List<MySqlPacket>> implements TimeAware {
+public final class MySqlFlowHandler extends AbstractFlowHandler<MySqlMetric, MySqlPacket, PacketsBulk<MySqlPacket>> implements TimeAware {
 
     private final Logger LOGGER = LogManager.getLogger(PgSqlFlowHandler.class);
 
@@ -32,7 +33,7 @@
     }
 
     @Override
-    protected FlowHandlerDataDecoder<List<MySqlPacket>> createDecoder() {
+    protected FlowHandlerDataDecoder<PacketsBulk<MySqlPacket>> createDecoder() {
         return new MySqlFlowHandlerDataDecoder(context);
     }
 
@@ -80,9 +81,9 @@
     }
 
     @Override
-    protected void onResponseReceived0(List<MySqlPacket> resps, FlowContext flowContext) {
-        if (resps.size() == 1) {
-            MySqlPacket resp = resps.get(0);
+    protected void onResponseReceived0(PacketsBulk<MySqlPacket> bulk, FlowContext flowContext) {
+        if (bulk.packets.size() == 1) {
+            MySqlPacket resp = bulk.packets.get(0);
             if (resp.getPacketType() == MySqlPacketTypes.ERROR_RESPONSE) {
                 MySqlErrorResponse errorRsp = (MySqlErrorResponse) resp;
                 if (collectMetrics) {
@@ -102,7 +103,7 @@
             }
         } else {
             if (collectMetrics) {
-                for (MySqlPacket resp : resps) {
+                for (MySqlPacket resp : bulk.packets) {
                     if (resp.getPacketType() == ROW_RESPONSE) {
                         synchronized (metric) {
                             metric.incRecordsNum();
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataDecoder.java	Tue May 12 09:33:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataDecoder.java	Tue May 12 10:02:17 2020 +0200
@@ -4,17 +4,15 @@
 import com.passus.data.DataDecoder;
 import com.passus.net.mysql.MySqlDecoderContext;
 import com.passus.net.mysql.MySqlPacket;
-import com.passus.net.mysql.MySqlResponseDecoder;
+import com.passus.st.PacketsBulk;
 import com.passus.st.client.FlowContext;
 import com.passus.st.client.FlowHandlerDataDecoder;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.List;
-
 import static com.passus.st.client.FlowUtils.debug;
 
-public class MySqlFlowHandlerDataDecoder implements FlowHandlerDataDecoder<List<MySqlPacket>> {
+public class MySqlFlowHandlerDataDecoder implements FlowHandlerDataDecoder<PacketsBulk<MySqlPacket>> {
 
     private static final Logger LOGGER = LogManager.getLogger(MySqlFlowHandlerDataDecoder.class);
 
@@ -25,7 +23,7 @@
     }
 
     @Override
-    public List<MySqlPacket> getResult() {
+    public PacketsBulk<MySqlPacket> getResult() {
         return decoder.getResult();
     }
 
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlResponsePacketsDecoder.java	Tue May 12 09:33:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlResponsePacketsDecoder.java	Tue May 12 10:02:17 2020 +0200
@@ -9,18 +9,18 @@
 import com.passus.net.mysql.MySqlDecoderContext;
 import com.passus.net.mysql.MySqlPacket;
 import com.passus.net.mysql.MySqlResponseDecoder;
+import com.passus.st.PacketsBulk;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.ArrayList;
-import java.util.List;
+import static com.passus.st.Protocols.MYSQL;
 
-public class MySqlResponsePacketsDecoder extends AbstractDataDecoder<List<MySqlPacket>> {
+public class MySqlResponsePacketsDecoder extends AbstractDataDecoder<PacketsBulk<MySqlPacket>> {
 
     private static final Logger LOGGER = LogManager.getLogger(MySqlResponsePacketsDecoder.class);
 
-    private final List<MySqlPacket> packets;
+    private final PacketsBulk<MySqlPacket> bulk;
 
     private final MySqlDecoderContext context;
 
@@ -29,18 +29,18 @@
     public MySqlResponsePacketsDecoder(MySqlDecoderContext context) {
         this.context = context;
         this.pdu = new MySqlPdu(context);
-        this.packets = new ArrayList<>();
+        this.bulk = new PacketsBulk<>(MYSQL);
     }
 
     @Override
-    public List<MySqlPacket> getResult() {
-        return packets;
+    public PacketsBulk<MySqlPacket> getResult() {
+        return bulk;
     }
 
     @Override
     public void clear() {
         pdu.pdu.clear();
-        packets.clear();
+        bulk.packets.clear();
     }
 
     @Override
@@ -83,7 +83,7 @@
                 if (decoder.state() == MySqlDecoder.STATE_FINISHED) {
                     MySqlPacket packet = decoder.getResult();
                     if (packet != null) {
-                        packets.add(packet);
+                        bulk.packets.add(packet);
                         if (context.stage() == MySqlDecoderContext.STAGE_REQUEST
                                 || context.stage() == MySqlDecoderContext.STAGE_NONE) {
                             state(STATE_FINISHED);
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlFilter.java	Tue May 12 09:33:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlFilter.java	Tue May 12 10:02:17 2020 +0200
@@ -1,34 +1,35 @@
 package com.passus.st.client.mysql.filter;
 
 import com.passus.net.mysql.MySqlPacket;
+import com.passus.st.PacketsBulk;
 import com.passus.st.client.FlowContext;
 import com.passus.st.filter.FlowFilter;
 
-import java.util.List;
+import static com.passus.st.Protocols.MYSQL;
 
 public abstract class MySqlFilter implements FlowFilter {
 
     @Override
     public int filterInbound(Object req, Object resp, FlowContext context) {
-        if (req instanceof MySqlPacket || resp instanceof List) {
-            return filterInbound((MySqlPacket) req, (List<MySqlPacket>) resp, context);
+        if (req instanceof MySqlPacket || (resp instanceof PacketsBulk && ((PacketsBulk) resp).protocol == MYSQL)) {
+            return filterInbound((MySqlPacket) req, (PacketsBulk<MySqlPacket>) resp, context);
         }
         return DUNNO;
     }
 
-    public int filterInbound(MySqlPacket req, List<MySqlPacket> resp, FlowContext context) {
+    public int filterInbound(MySqlPacket req, PacketsBulk<MySqlPacket> resp, FlowContext context) {
         return DUNNO;
     }
 
     @Override
     public int filterOutbound(Object req, Object resp, FlowContext context) {
-        if (req instanceof MySqlPacket || resp instanceof List) {
-            return filterOutbound((MySqlPacket) req, (List<MySqlPacket>) resp, context);
+        if (req instanceof MySqlPacket || (resp instanceof PacketsBulk && ((PacketsBulk) resp).protocol == MYSQL)) {
+            return filterOutbound((MySqlPacket) req, (PacketsBulk<MySqlPacket>) resp, context);
         }
         return DUNNO;
     }
 
-    public int filterOutbound(MySqlPacket req, List<MySqlPacket> resp, FlowContext context) {
+    public int filterOutbound(MySqlPacket req, PacketsBulk<MySqlPacket> resp, FlowContext context) {
         return DUNNO;
     }
 
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java	Tue May 12 09:33:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java	Tue May 12 10:02:17 2020 +0200
@@ -10,6 +10,7 @@
 import com.passus.net.mysql.MySqlPacket;
 import com.passus.net.mysql.MySqlPacketTypes;
 import com.passus.net.mysql.MySqlServerGreetingResponse;
+import com.passus.st.PacketsBulk;
 import com.passus.st.client.FlowContext;
 import com.passus.st.client.credentials.Credentials;
 import com.passus.st.client.credentials.CredentialsProvider;
@@ -17,8 +18,6 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.List;
-
 import static com.passus.config.schema.ConfigurationSchemaBuilder.*;
 import static com.passus.net.mysql.MySqlUtils.passScramble411;
 import static com.passus.st.filter.CredentialsProviderNodeDefinition.providersNodeDef;
@@ -52,9 +51,9 @@
     }
 
     @Override
-    public int filterInbound(MySqlPacket req, List<MySqlPacket> resps, FlowContext context) {
-        if (resps.size() == 1) {
-            MySqlPacket resp = resps.get(0);
+    public int filterInbound(MySqlPacket req, PacketsBulk<MySqlPacket> bulk, FlowContext context) {
+        if (bulk.packets.size() == 1) {
+            MySqlPacket resp = bulk.packets.get(0);
             if (resp.getPacketType() == MySqlPacketTypes.GREETINGS) {
                 context.setParam("mysql.greeting", resp);
             }
@@ -93,7 +92,7 @@
     }
 
     @Override
-    public int filterOutbound(MySqlPacket req, List<MySqlPacket> resp, FlowContext context) {
+    public int filterOutbound(MySqlPacket req, PacketsBulk<MySqlPacket> bulk, FlowContext context) {
         if (req.getPacketType() == MySqlPacketTypes.LOGIN_REQUEST) {
             MySqlServerGreetingResponse greeting = context.getParamValue("mysql.greeting");
             if (greeting != null) {
--- a/stress-tester/src/test/java/com/passus/st/client/mysql/MySqlResponsePacketsDecoderTest.java	Tue May 12 09:33:59 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/mysql/MySqlResponsePacketsDecoderTest.java	Tue May 12 10:02:17 2020 +0200
@@ -3,6 +3,7 @@
 import com.passus.net.mysql.*;
 import com.passus.net.packet.Tcp;
 import com.passus.net.source.pcap.PcapUtils;
+import com.passus.st.PacketsBulk;
 import org.testng.annotations.Test;
 
 import java.util.Iterator;
@@ -28,9 +29,9 @@
         respDecoder.decode(it.next().getPayload());
         assertEquals(STATE_FINISHED, respDecoder.state());
 
-        List<MySqlPacket> packets = respDecoder.getResult();
-        assertEquals(1, packets.size());
-        assertTrue(packets.get(0) instanceof MySqlServerGreetingResponse);
+        PacketsBulk<MySqlPacket> bulk = respDecoder.getResult();
+        assertEquals(1, bulk.packets.size());
+        assertTrue(bulk.packets.get(0) instanceof MySqlServerGreetingResponse);
 
         reqDecoder.decode(it.next().getPayload());
         assertEquals(STATE_FINISHED, reqDecoder.state());
@@ -80,9 +81,9 @@
         respDecoder.decode(it.next().getPayload());
         assertEquals(STATE_FINISHED, respDecoder.state());
 
-        List<MySqlPacket> packets = respDecoder.getResult();
-        assertEquals(6, packets.stream().filter(p -> p instanceof MySqlColumnDefinition).count());
-        assertEquals(1000, packets.stream().filter(p -> p instanceof MySqlRowResponse).count());
+        PacketsBulk<MySqlPacket> bulk = respDecoder.getResult();
+        assertEquals(6, bulk.packets.stream().filter(p -> p instanceof MySqlColumnDefinition).count());
+        assertEquals(1000, bulk.packets.stream().filter(p -> p instanceof MySqlRowResponse).count());
 
         reqDecoder.decode(it.next().getPayload());
         assertEquals(STATE_FINISHED, reqDecoder.state());
@@ -103,8 +104,8 @@
         respDecoder.decode(it.next().getPayload());
         assertEquals(STATE_FINISHED, respDecoder.state());
 
-        List<MySqlPacket> packets = respDecoder.getResult();
-        assertEquals(5, packets.size());
-        assertEquals(4, packets.stream().filter(p -> p instanceof MySqlColumnDefinition).count());
+        PacketsBulk<MySqlPacket> bulk = respDecoder.getResult();
+        assertEquals(5, bulk.packets.size());
+        assertEquals(4, bulk.packets.stream().filter(p -> p instanceof MySqlColumnDefinition).count());
     }
 }
\ No newline at end of file