changeset 1093:f48d0f6748e6

MySqlFlowHandlerDataDecoder uses MySqlResponsePacketsDecoder
author Devel 2
date Mon, 11 May 2020 12:28:21 +0200
parents 27e2ec3b81a6
children 7026f72f9bfc
files stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataDecoder.java stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataEncoder.java
diffstat 3 files changed, 44 insertions(+), 20 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java	Mon May 11 11:59:02 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java	Mon May 11 12:28:21 2020 +0200
@@ -9,9 +9,11 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.List;
+
 import static com.passus.st.Protocols.NETFLOW;
 
-public final class MySqlFlowHandler extends AbstractFlowHandler<MySqlMetric, MySqlPacket, MySqlPacket> implements TimeAware {
+public final class MySqlFlowHandler extends AbstractFlowHandler<MySqlMetric, MySqlPacket, List<MySqlPacket>> implements TimeAware {
 
     private final Logger LOGGER = LogManager.getLogger(PgSqlFlowHandler.class);
 
@@ -29,7 +31,7 @@
     }
 
     @Override
-    protected FlowHandlerDataDecoder<MySqlPacket> createDecoder() {
+    protected FlowHandlerDataDecoder<List<MySqlPacket>> createDecoder() {
         return new MySqlFlowHandlerDataDecoder(context);
     }
 
@@ -77,22 +79,25 @@
     }
 
     @Override
-    protected void onResponseReceived0(MySqlPacket resp, FlowContext flowContext) {
-        if (resp.getPacketType() == MySqlPacketTypes.ERROR_RESPONSE) {
-            MySqlErrorResponse errorRsp = (MySqlErrorResponse) resp;
-            if (collectMetrics) {
-                synchronized (metric) {
-                    metric.addErrorCode(errorRsp.getErrorCode());
-                }
-            }
-
-            MySqlPacket req = (MySqlPacket) flowContext.sentRequest();
-            if (req.getPacketType() == MySqlPacketTypes.LOGIN_REQUEST) {
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("MySql auth failed. Server message " + errorRsp);
+    protected void onResponseReceived0(List<MySqlPacket> resps, FlowContext flowContext) {
+        if (resps.size() == 1) {
+            MySqlPacket resp = resps.get(0);
+            if (resp.getPacketType() == MySqlPacketTypes.ERROR_RESPONSE) {
+                MySqlErrorResponse errorRsp = (MySqlErrorResponse) resp;
+                if (collectMetrics) {
+                    synchronized (metric) {
+                        metric.addErrorCode(errorRsp.getErrorCode());
+                    }
                 }
 
-                disconnectAndBlock(flowContext);
+                MySqlPacket req = (MySqlPacket) flowContext.sentRequest();
+                if (req.getPacketType() == MySqlPacketTypes.LOGIN_REQUEST) {
+                    if (LOGGER.isDebugEnabled()) {
+                        LOGGER.debug("MySql auth failed. Server message " + errorRsp);
+                    }
+
+                    disconnectAndBlock(flowContext);
+                }
             }
         }
     }
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataDecoder.java	Mon May 11 11:59:02 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataDecoder.java	Mon May 11 12:28:21 2020 +0200
@@ -10,20 +10,22 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.List;
+
 import static com.passus.st.client.FlowUtils.debug;
 
-public class MySqlFlowHandlerDataDecoder implements FlowHandlerDataDecoder<MySqlPacket> {
+public class MySqlFlowHandlerDataDecoder implements FlowHandlerDataDecoder<List<MySqlPacket>> {
 
     private static final Logger LOGGER = LogManager.getLogger(MySqlFlowHandlerDataDecoder.class);
 
-    private final MySqlResponseDecoder decoder;
+    private final MySqlResponsePacketsDecoder decoder;
 
     public MySqlFlowHandlerDataDecoder(MySqlDecoderContext context) {
-        decoder = new MySqlResponseDecoder(context);
+        decoder = new MySqlResponsePacketsDecoder(context);
     }
 
     @Override
-    public MySqlPacket getResult() {
+    public List<MySqlPacket> getResult() {
         return decoder.getResult();
     }
 
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataEncoder.java	Mon May 11 11:59:02 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataEncoder.java	Mon May 11 12:28:21 2020 +0200
@@ -4,9 +4,12 @@
 import com.passus.net.mysql.MySqlDecoderContext;
 import com.passus.net.mysql.MySqlEncoder;
 import com.passus.net.mysql.MySqlPacket;
+import com.passus.net.mysql.MySqlPacketTypes;
 import com.passus.st.client.FlowContext;
 import com.passus.st.client.FlowHandlerDataEncoder;
 
+import static com.passus.net.mysql.MySqlDecoderContext.*;
+
 public class MySqlFlowHandlerDataEncoder implements FlowHandlerDataEncoder<MySqlPacket> {
 
     private final MySqlEncoder encoder;
@@ -20,6 +23,20 @@
 
     @Override
     public void encode(MySqlPacket request, FlowContext flowContext, ByteBuff out) {
+        switch (request.getPacketType()) {
+            case MySqlPacketTypes.QUERY_COMMAND:
+            case MySqlPacketTypes.EXECUTE_STATEMENT_COMMAND:
+                context.stage(STAGE_RESPONSE_TABULAR);
+                break;
+            case MySqlPacketTypes.PREPARE_STATEMENT_COMMAND:
+                context.stage(STAGE_RESPONSE_PREPARE);
+                break;
+            case MySqlPacketTypes.SHOW_FIELDS_COMMAND:
+                context.stage(STAGE_RESPONSE_SHOW_FIELDS);
+            default:
+                context.stage(STAGE_REQUEST);
+        }
+
         encoder.encode(request, out);
     }
 }