changeset 1065:53e6b033a0d2

MySqlPacketTypeBasedEncoder, MySqlPacketTypeBasedDecoder
author Devel 2
date Fri, 24 Apr 2020 12:32:45 +0200
parents 2ad91607a199
children bea08c5fe560
files stress-tester/src/main/java/com/passus/st/client/mysql/MySqlPacketTypeBasedDecoder.java stress-tester/src/main/java/com/passus/st/client/mysql/MySqlPacketTypeBasedEncoder.java stress-tester/src/main/java/com/passus/st/reader/nc/NcMySqlPayloadReader.java stress-tester/src/main/java/com/passus/st/reader/nc/NcMySqlPayloadWriter.java stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java stress-tester/src/test/resources/pcap/mysql/mysql_session.pcap
diffstat 6 files changed, 172 insertions(+), 11 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlPacketTypeBasedDecoder.java	Fri Apr 24 12:32:45 2020 +0200
@@ -0,0 +1,139 @@
+package com.passus.st.client.mysql;
+
+import com.passus.data.AbstractDataDecoder;
+import com.passus.data.DataUtils;
+import com.passus.net.mysql.*;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import static com.passus.net.mysql.MySqlPacketTypes.*;
+
+public class MySqlPacketTypeBasedDecoder extends AbstractDataDecoder<MySqlPacket> {
+
+    private MySqlRequestDecoder reqDecoder = new MySqlRequestDecoder();
+
+    private MySqlResponseDecoder respDecoder = new MySqlResponseDecoder();
+
+    private final MutableObject<MySqlRequest> reqTmp = new MutableObject<>();
+
+    private final MutableObject<MySqlResponse> respTmp = new MutableObject<>();
+
+    private MySqlPacket result = null;
+
+    @Override
+    public MySqlPacket getResult() {
+        return result;
+    }
+
+    @Override
+    public void clear() {
+        result = null;
+        reqTmp.setValue(null);
+        respTmp.setValue(null);
+        reqDecoder.clear();
+        respDecoder.clear();
+    }
+
+    public int decode(byte[] data, int offset, int length) {
+        int startOffset = offset;
+        int endOffset = offset + length;
+
+        int packetType = data[offset++];
+
+        int packetLength = DataUtils.getInt3LE(data, offset);
+        offset += 3;
+
+        if (packetLength < 0) {
+            return error("Message length < 0.");
+        } else if (length < packetLength) {
+            state(STATE_DATA_NEEDED);
+            return offset - startOffset;
+        }
+
+        int packetNum = data[offset++];
+        MySqlDecoder lastDecoder;
+        switch (packetType) {
+            case GREETINGS:
+                respDecoder.decodeServerGreetingsResp(data, offset, packetLength, respTmp);
+                lastDecoder = respDecoder;
+                break;
+            case OK_RESPONSE:
+                respDecoder.decodeOkResp(data, offset, respTmp);
+                lastDecoder = respDecoder;
+                break;
+            case ERROR_RESPONSE:
+                respDecoder.decodeErrorResp(data, offset, endOffset, respTmp);
+                lastDecoder = respDecoder;
+                break;
+            case EOF_RESPONSE:
+                respDecoder.decodeEofResp(data, offset, respTmp);
+                lastDecoder = respDecoder;
+                break;
+            case FIELDS_NUM_RESPONSE:
+                respDecoder.decodeFieldsNumResponse(data, offset, respTmp);
+                lastDecoder = respDecoder;
+                break;
+            case COLUMN_DEFINITION_RESPONSE:
+                respDecoder.decodeColumnDefResp(data, offset, respTmp);
+                lastDecoder = respDecoder;
+                break;
+            case ROW_RESPONSE:
+                respDecoder.decodeRowResp(data, offset, packetLength, respTmp);
+                lastDecoder = respDecoder;
+                break;
+            case LOGIN_REQUEST:
+                reqDecoder.decodeLoginRequest(data, offset, length, reqTmp);
+                lastDecoder = reqDecoder;
+                break;
+            case QUERY_COMMAND:
+                offset++;
+                reqDecoder.decodeQueryCommand(data, offset, length, reqTmp);
+                lastDecoder = reqDecoder;
+                break;
+            case USE_DATABASE_COMMAND:
+                offset++;
+                reqDecoder.decodeUseDatabaseCommand(data, offset, length, reqTmp);
+                lastDecoder = reqDecoder;
+                break;
+            case SHOW_FIELDS_COMMAND:
+                offset++;
+                reqDecoder.decodeShowFieldsCommand(data, offset, length, reqTmp);
+                lastDecoder = reqDecoder;
+                break;
+            case CLOSE_STATEMENT_COMMAND:
+                offset++;
+                reqDecoder.decodeStatementCloseCommand(data, offset, reqTmp);
+                lastDecoder = reqDecoder;
+                break;
+            case QUIT_COMMAND:
+                offset++;
+                reqDecoder.decodeQueryCommand(data, offset, packetLength, reqTmp);
+                lastDecoder = reqDecoder;
+                break;
+            case GENERIC_COMMAND:
+                byte type = data[offset++];
+                reqDecoder.decodeGenericCommand(data, offset, packetLength, type, reqTmp);
+                lastDecoder = reqDecoder;
+                break;
+            default:
+                throw new RuntimeException("Not supported packet type " + packetType + ".");
+        }
+
+        MySqlPacket packet = respTmp.getValue();
+        if (packet == null) {
+            packet = reqTmp.getValue();
+        }
+
+        packet.setLength(packetLength);
+        packet.setPacketNum(packetNum);
+        if (lastDecoder.state() == STATE_ERROR) {
+            error(lastDecoder.getLastError());
+        } else {
+            result = packet;
+            state(STATE_FINISHED);
+        }
+
+        offset += packetLength;
+        return offset - startOffset;
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlPacketTypeBasedEncoder.java	Fri Apr 24 12:32:45 2020 +0200
@@ -0,0 +1,17 @@
+package com.passus.st.client.mysql;
+
+import com.passus.data.ByteBuff;
+import com.passus.data.DataEncoder;
+import com.passus.net.mysql.MySqlEncoder;
+import com.passus.net.mysql.MySqlPacket;
+
+public class MySqlPacketTypeBasedEncoder implements DataEncoder<MySqlPacket> {
+
+    private final MySqlEncoder encoder = new MySqlEncoder();
+
+    @Override
+    public void encode(MySqlPacket packet, ByteBuff out) {
+        out.append(packet.getPacketType());
+        encoder.encode(packet, out);
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcMySqlPayloadReader.java	Thu Apr 23 14:17:44 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcMySqlPayloadReader.java	Fri Apr 24 12:32:45 2020 +0200
@@ -2,28 +2,27 @@
 
 import com.passus.data.ByteBuff;
 import com.passus.data.DataDecoder;
-import com.passus.net.mysql.MySqlDecoder;
 import com.passus.net.mysql.MySqlPacket;
-import com.passus.net.mysql.MySqlRequestDecoder;
-import com.passus.net.mysql.MySqlResponseDecoder;
+import com.passus.st.client.mysql.MySqlPacketTypeBasedDecoder;
 import com.passus.st.source.NcPayloadReader;
 
 import java.io.IOException;
 
 public class NcMySqlPayloadReader extends NcPayloadReader<MySqlPacket, MySqlPacket> {
 
-    private final MySqlRequestDecoder decoderClient = new MySqlRequestDecoder();
+    private final MySqlPacketTypeBasedDecoder decoder = new MySqlPacketTypeBasedDecoder();
 
-    private final MySqlResponseDecoder decoderServer = new MySqlResponseDecoder();
-
-    private MySqlPacket decode(ByteBuff payload, MySqlDecoder decoder) throws IOException {
+    private MySqlPacket decode(ByteBuff payload) throws IOException {
         try {
             int res = decoder.decode(payload);
             payload.skipBytes(res);
             if (decoder.state() == DataDecoder.STATE_FINISHED) {
                 return decoder.getResult();
+            } else if (decoder.state() == DataDecoder.STATE_ERROR) {
+                throw new IOException("Invalid MySql data. Decoder error: " + decoder.getLastError());
+            } else {
+                throw new IOException("Invalid MySql data. More data needed.");
             }
-            throw new IOException("Invalid PgSql data.");
         } finally {
             decoder.clear();
         }
@@ -31,11 +30,11 @@
 
     @Override
     protected MySqlPacket decodeRequest(ByteBuff payload) throws IOException {
-        return decode(payload, decoderClient);
+        return decode(payload);
     }
 
     @Override
     protected MySqlPacket decodeResponse(ByteBuff payload) throws IOException {
-        return decode(payload, decoderServer);
+        return decode(payload);
     }
 }
\ No newline at end of file
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcMySqlPayloadWriter.java	Thu Apr 23 14:17:44 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcMySqlPayloadWriter.java	Fri Apr 24 12:32:45 2020 +0200
@@ -2,12 +2,13 @@
 
 import com.passus.net.mysql.MySqlEncoder;
 import com.passus.net.mysql.MySqlPacket;
+import com.passus.st.client.mysql.MySqlPacketTypeBasedEncoder;
 import com.passus.st.source.AbstractNcPayloadWriter;
 
 public class NcMySqlPayloadWriter extends AbstractNcPayloadWriter<MySqlPacket, MySqlPacket> {
 
     public NcMySqlPayloadWriter() {
-        super(new MySqlEncoder(), new MySqlEncoder());
+        super(new MySqlPacketTypeBasedEncoder(), new MySqlPacketTypeBasedEncoder());
     }
 
 }
\ No newline at end of file
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Thu Apr 23 14:17:44 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Fri Apr 24 12:32:45 2020 +0200
@@ -2,6 +2,7 @@
 
 import com.passus.data.PooledByteBuffAllocator;
 import com.passus.net.PortRangeSet;
+import com.passus.st.Log4jConfigurationFactory;
 import com.passus.st.Protocols;
 import com.passus.st.client.ArrayListEventHandler;
 import com.passus.st.client.Event;
@@ -25,6 +26,10 @@
  */
 public class NcEventSourceTest {
 
+    static {
+        Log4jConfigurationFactory.enableFactory("debug");
+    }
+
     private FileEvents writeEvents(String pcapFile) throws IOException {
         PortRangeSet portRanges = new PortRangeSet();
         portRanges.add(4214).add(8080).add(53).add(2055).add(5432).add(3306);
Binary file stress-tester/src/test/resources/pcap/mysql/mysql_session.pcap has changed