changeset 1064:2ad91607a199

MySql integration in progress
author Devel 2
date Thu, 23 Apr 2020 14:17:44 +0200
parents 6fcaacd4fa38
children 53e6b033a0d2
files stress-tester/src/main/java/com/passus/st/Protocols.java stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataDecoder.java stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataEncoder.java stress-tester/src/main/java/com/passus/st/client/mysql/MySqlMetric.java stress-tester/src/main/java/com/passus/st/reader/nc/NcMySqlPayloadReader.java stress-tester/src/main/java/com/passus/st/reader/nc/NcMySqlPayloadWriter.java stress-tester/src/main/java/com/passus/st/reader/nc/NcPgSqlPayloadWriter.java stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java stress-tester/src/main/java/com/passus/st/source/NcEventSource.java stress-tester/src/main/java/com/passus/st/utils/EventUtils.java stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java stress-tester/src/test/resources/pcap/mysql/mysql_session.pcap
diffstat 14 files changed, 300 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/Protocols.java	Mon Apr 20 15:44:07 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/Protocols.java	Thu Apr 23 14:17:44 2020 +0200
@@ -31,7 +31,7 @@
             case PGSQL:
                 return "PgSql";
             case MYSQL:
-                return "Mysql";
+                return "MySql";
             default:
                 return "unknown (" + protocolId + ")";
         }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java	Mon Apr 20 15:44:07 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java	Thu Apr 23 14:17:44 2020 +0200
@@ -3,10 +3,12 @@
 import com.passus.net.dns.DnsRecord;
 import com.passus.net.http.HttpRequest;
 import com.passus.net.http.HttpResponse;
+import com.passus.net.mysql.MySqlPacket;
 import com.passus.net.netflow.Netflow;
 import com.passus.net.pgsql.PgSqlMessage;
 import com.passus.st.client.dns.DnsFlowHandler;
 import com.passus.st.client.http.HttpFlowHandler;
+import com.passus.st.client.mysql.MySqlFlowHandler;
 import com.passus.st.client.netflow.NetflowFlowHandler;
 import com.passus.st.client.pgsql.PgSqlFlowHandler;
 
@@ -25,6 +27,8 @@
                 return new NetflowFlowHandler();
             case PGSQL:
                 return new PgSqlFlowHandler();
+            case MYSQL:
+                return new MySqlFlowHandler();
 
         }
 
@@ -45,6 +49,8 @@
             return new NetflowFlowHandler();
         } else if (message instanceof PgSqlMessage) {
             return new PgSqlFlowHandler();
+        } else if (message instanceof MySqlPacket) {
+            return new MySqlFlowHandler();
         }
 
         throw new IllegalArgumentException("Not supported class '" + message.getClass() + "'.");
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java	Thu Apr 23 14:17:44 2020 +0200
@@ -0,0 +1,97 @@
+package com.passus.st.client.mysql;
+
+import com.passus.commons.Assert;
+import com.passus.commons.time.TimeAware;
+import com.passus.commons.time.TimeGenerator;
+import com.passus.net.mysql.MySqlErrorResponse;
+import com.passus.net.mysql.MySqlPacket;
+import com.passus.net.mysql.MySqlPacketTypes;
+import com.passus.net.mysql.MySqlQueryCommand;
+import com.passus.st.client.AbstractFlowHandler;
+import com.passus.st.client.FlowContext;
+import com.passus.st.client.pgsql.PgSqlFlowHandler;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import static com.passus.st.Protocols.NETFLOW;
+
+public class MySqlFlowHandler extends AbstractFlowHandler<MySqlMetric, MySqlPacket, MySqlPacket> implements TimeAware {
+
+    private final Logger LOGGER = LogManager.getLogger(PgSqlFlowHandler.class);
+
+    TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
+
+    public MySqlFlowHandler() {
+        super(new MySqlFlowHandlerDataEncoder(), new MySqlFlowHandlerDataDecoder());
+    }
+
+    @Override
+    public int getProtocolId() {
+        return NETFLOW;
+    }
+
+    @Override
+    protected MySqlMetric createMetric() {
+        return new MySqlMetric();
+    }
+
+    @Override
+    protected void onRequestSent0(MySqlPacket req, FlowContext flowContext) {
+        if (collectMetrics) {
+            if (req.getPacketType() == MySqlPacketTypes.QUERY_COMMAND) {
+                MySqlQueryCommand queryCmd = (MySqlQueryCommand) req;
+                synchronized (metric) {
+                    metric.addQuery(queryCmd.getQuery());
+                }
+            }
+        }
+    }
+
+    private void disconnectAndBlock(FlowContext flowContext) {
+        try {
+            flowContext.block();
+            flowContext.channelContext().close();
+        } catch (Exception ignore) {
+
+        }
+    }
+
+    @Override
+    protected void onResponseReceived0(MySqlPacket resp, FlowContext flowContext) {
+        if (resp.getPacketType() == MySqlPacketTypes.ERROR_RESPONSE) {
+            MySqlErrorResponse errorRsp = (MySqlErrorResponse) resp;
+            if (collectMetrics) {
+                synchronized (metric) {
+                    metric.addErrorCode(errorRsp.getErrorCode());
+                }
+            }
+
+            MySqlPacket req = (MySqlPacket) flowContext.sentRequest();
+            if (req.getPacketType() == MySqlPacketTypes.LOGIN_REQUEST) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("MySql auth failed. Server message " + errorRsp);
+                }
+
+                disconnectAndBlock(flowContext);
+            }
+        }
+    }
+
+    @Override
+    public TimeGenerator getTimeGenerator() {
+        return timeGenerator;
+    }
+
+    @Override
+    public void setTimeGenerator(TimeGenerator timeGenerator) {
+        Assert.notNull(timeGenerator, "timeGenerator");
+        this.timeGenerator = timeGenerator;
+    }
+
+    @Override
+    public void init(FlowContext flowContext) {
+        flowContext.setBidirectional(true);
+    }
+
+
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataDecoder.java	Thu Apr 23 14:17:44 2020 +0200
@@ -0,0 +1,58 @@
+package com.passus.st.client.mysql;
+
+import com.passus.data.ByteBuff;
+import com.passus.data.DataDecoder;
+import com.passus.net.mysql.MySqlPacket;
+import com.passus.net.mysql.MySqlResponseDecoder;
+import com.passus.st.client.FlowContext;
+import com.passus.st.client.FlowHandlerDataDecoder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import static com.passus.st.client.FlowUtils.debug;
+
+public class MySqlFlowHandlerDataDecoder implements FlowHandlerDataDecoder<MySqlPacket> {
+
+    private static final Logger LOGGER = LogManager.getLogger(MySqlFlowHandlerDataDecoder.class);
+
+    private MySqlResponseDecoder decoder = new MySqlResponseDecoder();
+
+    public MySqlFlowHandlerDataDecoder() {
+
+    }
+
+    @Override
+    public MySqlPacket getResult() {
+        return decoder.getResult();
+    }
+
+    @Override
+    public int state() {
+        return decoder.state();
+    }
+
+    @Override
+    public String getLastError() {
+        return decoder.getLastError();
+    }
+
+    @Override
+    public int decode(ByteBuff buffer, FlowContext flowContext) {
+        int res = decoder.decode(buffer);
+        if (decoder.state() == DataDecoder.STATE_FINISHED) {
+            long now = System.currentTimeMillis();
+            if (LOGGER.isDebugEnabled()) {
+                debug(LOGGER, flowContext,
+                        "Response decoded (size: {} B, downloaded: {} ms). {}",
+                        res,
+                        now - flowContext.receivedStartTimestamp(),
+                        decoder.getResult()
+                );
+            }
+        }
+
+        return res;
+    }
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataEncoder.java	Thu Apr 23 14:17:44 2020 +0200
@@ -0,0 +1,18 @@
+package com.passus.st.client.mysql;
+
+import com.passus.data.ByteBuff;
+import com.passus.net.mysql.MySqlEncoder;
+import com.passus.net.mysql.MySqlPacket;
+import com.passus.st.client.FlowContext;
+import com.passus.st.client.FlowHandlerDataEncoder;
+
+public class MySqlFlowHandlerDataEncoder implements FlowHandlerDataEncoder<MySqlPacket> {
+
+    private final MySqlEncoder encoder = new MySqlEncoder();
+
+    @Override
+    public void encode(MySqlPacket request, FlowContext flowContext, ByteBuff out) {
+        encoder.encode(request, out);
+    }
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlMetric.java	Thu Apr 23 14:17:44 2020 +0200
@@ -0,0 +1,45 @@
+package com.passus.st.client.mysql;
+
+import com.passus.commons.metric.Metric;
+import com.passus.st.client.DbMetric;
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import java.util.HashMap;
+
+public class MySqlMetric extends DbMetric {
+
+    public static final String DEFAULT_NAME = "MySql";
+
+    private final HashMap<Integer, MutableInt> errorCodesCount = new HashMap<>();
+
+    public MySqlMetric() {
+        this(DEFAULT_NAME);
+    }
+
+    public MySqlMetric(String name) {
+        super(name);
+        attrs.put("errorCodes", errorCodesCount);
+    }
+
+    public void addErrorCode(int status) {
+        Metric.incrementCountMap(status, errorCodesCount);
+    }
+
+    @Override
+    public void update(Metric metric) {
+        super.update(metric);
+        MySqlMetric dbMetric = (MySqlMetric) metric;
+        Metric.updateCountMap(errorCodesCount, dbMetric.errorCodesCount);
+    }
+
+    @Override
+    public void reset() {
+        super.reset();
+        errorCodesCount.clear();
+    }
+
+    @Override
+    public String extractStatementType(String query) {
+        return null;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcMySqlPayloadReader.java	Thu Apr 23 14:17:44 2020 +0200
@@ -0,0 +1,41 @@
+package com.passus.st.reader.nc;
+
+import com.passus.data.ByteBuff;
+import com.passus.data.DataDecoder;
+import com.passus.net.mysql.MySqlDecoder;
+import com.passus.net.mysql.MySqlPacket;
+import com.passus.net.mysql.MySqlRequestDecoder;
+import com.passus.net.mysql.MySqlResponseDecoder;
+import com.passus.st.source.NcPayloadReader;
+
+import java.io.IOException;
+
+public class NcMySqlPayloadReader extends NcPayloadReader<MySqlPacket, MySqlPacket> {
+
+    private final MySqlRequestDecoder decoderClient = new MySqlRequestDecoder();
+
+    private final MySqlResponseDecoder decoderServer = new MySqlResponseDecoder();
+
+    private MySqlPacket decode(ByteBuff payload, MySqlDecoder decoder) throws IOException {
+        try {
+            int res = decoder.decode(payload);
+            payload.skipBytes(res);
+            if (decoder.state() == DataDecoder.STATE_FINISHED) {
+                return decoder.getResult();
+            }
+            throw new IOException("Invalid PgSql data.");
+        } finally {
+            decoder.clear();
+        }
+    }
+
+    @Override
+    protected MySqlPacket decodeRequest(ByteBuff payload) throws IOException {
+        return decode(payload, decoderClient);
+    }
+
+    @Override
+    protected MySqlPacket decodeResponse(ByteBuff payload) throws IOException {
+        return decode(payload, decoderServer);
+    }
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcMySqlPayloadWriter.java	Thu Apr 23 14:17:44 2020 +0200
@@ -0,0 +1,13 @@
+package com.passus.st.reader.nc;
+
+import com.passus.net.mysql.MySqlEncoder;
+import com.passus.net.mysql.MySqlPacket;
+import com.passus.st.source.AbstractNcPayloadWriter;
+
+public class NcMySqlPayloadWriter extends AbstractNcPayloadWriter<MySqlPacket, MySqlPacket> {
+
+    public NcMySqlPayloadWriter() {
+        super(new MySqlEncoder(), new MySqlEncoder());
+    }
+
+}
\ No newline at end of file
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcPgSqlPayloadWriter.java	Mon Apr 20 15:44:07 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcPgSqlPayloadWriter.java	Thu Apr 23 14:17:44 2020 +0200
@@ -8,6 +8,6 @@
 
     public NcPgSqlPayloadWriter() {
         super(new PgSqlEncoder(), new PgSqlEncoder());
+    }
 
-    }
 }
\ No newline at end of file
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java	Mon Apr 20 15:44:07 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java	Thu Apr 23 14:17:44 2020 +0200
@@ -39,6 +39,8 @@
 
     private final NcPgSqlPayloadWriter pgSqlWriter = new NcPgSqlPayloadWriter();
 
+    private final NcMySqlPayloadWriter mySqlWriter = new NcMySqlPayloadWriter();
+
     private boolean allowOverwrite;
 
     public NcEventDestination() {
@@ -134,6 +136,9 @@
             case PGSQL:
                 pgSqlWriter.write(event, writer);
                 break;
+            case MYSQL:
+                mySqlWriter.write(event, writer);
+                break;
         }
     }
 
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Mon Apr 20 15:44:07 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Thu Apr 23 14:17:44 2020 +0200
@@ -60,6 +60,8 @@
 
     private NcPgSqlPayloadReader pgSqlReader = new NcPgSqlPayloadReader();
 
+    private NcMySqlPayloadReader mySqlSqlReader = new NcMySqlPayloadReader();
+
     private String name = UniqueIdGenerator.generate();
 
     private ReaderThread readerThread;
@@ -258,6 +260,9 @@
                     case PGSQL:
                         messages = pgSqlReader.read(payload);
                         break;
+                    case MYSQL:
+                        messages = mySqlSqlReader.read(payload);
+                        break;
                     default:
                         if (LOGGER.isDebugEnabled()) {
                             LOGGER.debug("Not supported protocol {}.", payloadBlock.proto());
--- a/stress-tester/src/main/java/com/passus/st/utils/EventUtils.java	Mon Apr 20 15:44:07 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/utils/EventUtils.java	Thu Apr 23 14:17:44 2020 +0200
@@ -5,6 +5,7 @@
 import com.passus.net.PortRangeSet;
 import com.passus.net.dns.session.DnsUdpSessionAnalyzer;
 import com.passus.net.http.session.HttpSessionAnalyzer;
+import com.passus.net.mysql.MySqlTcpSessionAnalyzer;
 import com.passus.net.netflow.NetflowUdpSessionAnalyzer;
 import com.passus.net.pgsql.PgSqlTcpSessionAnalyzer;
 import com.passus.net.session.SessionAnalyzer;
@@ -82,6 +83,12 @@
                 src.addAnalyzer(analyzer);
             }
 
+            if (protocols.contains(Protocols.MYSQL)) {
+                SessionAnalyzer analyzer = new MySqlTcpSessionAnalyzer();
+                analyzer.getPortsRange().add(3306);
+                src.addAnalyzer(analyzer);
+            }
+
             src.setLoops(1);
 
             ArrayListEventHandler handler = new ArrayListEventHandler();
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Mon Apr 20 15:44:07 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Thu Apr 23 14:17:44 2020 +0200
@@ -27,7 +27,7 @@
 
     private FileEvents writeEvents(String pcapFile) throws IOException {
         PortRangeSet portRanges = new PortRangeSet();
-        portRanges.add(4214).add(8080).add(53).add(2055).add(5432);
+        portRanges.add(4214).add(8080).add(53).add(2055).add(5432).add(3306);
         Map<String, Object> props = new HashMap<>();
         props.put("allowPartialSession", true);
         props.put("ports", portRanges);
@@ -53,7 +53,8 @@
                 {"pcap/http/http_1.pcap", HTTP},
                 {"pcap/dns/dns_A_req_resp.pcap", DNS},
                 {"pcap/netflow/netflow_v5.pcap", NETFLOW},
-                {"pcap/pgsql/pgsql_session.pcap", PGSQL}
+                {"pcap/pgsql/pgsql_session.pcap", PGSQL},
+                {"pcap/mysql/mysql_session.pcap", MYSQL}
         };
     }
 
Binary file stress-tester/src/test/resources/pcap/mysql/mysql_session.pcap has changed