Mercurial > stress-tester
changeset 1064:2ad91607a199
MySql integration in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/Protocols.java Mon Apr 20 15:44:07 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/Protocols.java Thu Apr 23 14:17:44 2020 +0200 @@ -31,7 +31,7 @@ case PGSQL: return "PgSql"; case MYSQL: - return "Mysql"; + return "MySql"; default: return "unknown (" + protocolId + ")"; }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java Mon Apr 20 15:44:07 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java Thu Apr 23 14:17:44 2020 +0200 @@ -3,10 +3,12 @@ import com.passus.net.dns.DnsRecord; import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpResponse; +import com.passus.net.mysql.MySqlPacket; import com.passus.net.netflow.Netflow; import com.passus.net.pgsql.PgSqlMessage; import com.passus.st.client.dns.DnsFlowHandler; import com.passus.st.client.http.HttpFlowHandler; +import com.passus.st.client.mysql.MySqlFlowHandler; import com.passus.st.client.netflow.NetflowFlowHandler; import com.passus.st.client.pgsql.PgSqlFlowHandler; @@ -25,6 +27,8 @@ return new NetflowFlowHandler(); case PGSQL: return new PgSqlFlowHandler(); + case MYSQL: + return new MySqlFlowHandler(); } @@ -45,6 +49,8 @@ return new NetflowFlowHandler(); } else if (message instanceof PgSqlMessage) { return new PgSqlFlowHandler(); + } else if (message instanceof MySqlPacket) { + return new MySqlFlowHandler(); } throw new IllegalArgumentException("Not supported class '" + message.getClass() + "'.");
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java Thu Apr 23 14:17:44 2020 +0200 @@ -0,0 +1,97 @@ +package com.passus.st.client.mysql; + +import com.passus.commons.Assert; +import com.passus.commons.time.TimeAware; +import com.passus.commons.time.TimeGenerator; +import com.passus.net.mysql.MySqlErrorResponse; +import com.passus.net.mysql.MySqlPacket; +import com.passus.net.mysql.MySqlPacketTypes; +import com.passus.net.mysql.MySqlQueryCommand; +import com.passus.st.client.AbstractFlowHandler; +import com.passus.st.client.FlowContext; +import com.passus.st.client.pgsql.PgSqlFlowHandler; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import static com.passus.st.Protocols.NETFLOW; + +public class MySqlFlowHandler extends AbstractFlowHandler<MySqlMetric, MySqlPacket, MySqlPacket> implements TimeAware { + + private final Logger LOGGER = LogManager.getLogger(PgSqlFlowHandler.class); + + TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); + + public MySqlFlowHandler() { + super(new MySqlFlowHandlerDataEncoder(), new MySqlFlowHandlerDataDecoder()); + } + + @Override + public int getProtocolId() { + return NETFLOW; + } + + @Override + protected MySqlMetric createMetric() { + return new MySqlMetric(); + } + + @Override + protected void onRequestSent0(MySqlPacket req, FlowContext flowContext) { + if (collectMetrics) { + if (req.getPacketType() == MySqlPacketTypes.QUERY_COMMAND) { + MySqlQueryCommand queryCmd = (MySqlQueryCommand) req; + synchronized (metric) { + metric.addQuery(queryCmd.getQuery()); + } + } + } + } + + private void disconnectAndBlock(FlowContext flowContext) { + try { + flowContext.block(); + flowContext.channelContext().close(); + } catch (Exception ignore) { + + } + } + + @Override + protected void onResponseReceived0(MySqlPacket resp, FlowContext flowContext) { + if (resp.getPacketType() == MySqlPacketTypes.ERROR_RESPONSE) { + MySqlErrorResponse errorRsp = (MySqlErrorResponse) resp; + if (collectMetrics) { + synchronized (metric) { + metric.addErrorCode(errorRsp.getErrorCode()); + } + } + + MySqlPacket req = (MySqlPacket) flowContext.sentRequest(); + if (req.getPacketType() == MySqlPacketTypes.LOGIN_REQUEST) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("MySql auth failed. Server message " + errorRsp); + } + + disconnectAndBlock(flowContext); + } + } + } + + @Override + public TimeGenerator getTimeGenerator() { + return timeGenerator; + } + + @Override + public void setTimeGenerator(TimeGenerator timeGenerator) { + Assert.notNull(timeGenerator, "timeGenerator"); + this.timeGenerator = timeGenerator; + } + + @Override + public void init(FlowContext flowContext) { + flowContext.setBidirectional(true); + } + + +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataDecoder.java Thu Apr 23 14:17:44 2020 +0200 @@ -0,0 +1,58 @@ +package com.passus.st.client.mysql; + +import com.passus.data.ByteBuff; +import com.passus.data.DataDecoder; +import com.passus.net.mysql.MySqlPacket; +import com.passus.net.mysql.MySqlResponseDecoder; +import com.passus.st.client.FlowContext; +import com.passus.st.client.FlowHandlerDataDecoder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import static com.passus.st.client.FlowUtils.debug; + +public class MySqlFlowHandlerDataDecoder implements FlowHandlerDataDecoder<MySqlPacket> { + + private static final Logger LOGGER = LogManager.getLogger(MySqlFlowHandlerDataDecoder.class); + + private MySqlResponseDecoder decoder = new MySqlResponseDecoder(); + + public MySqlFlowHandlerDataDecoder() { + + } + + @Override + public MySqlPacket getResult() { + return decoder.getResult(); + } + + @Override + public int state() { + return decoder.state(); + } + + @Override + public String getLastError() { + return decoder.getLastError(); + } + + @Override + public int decode(ByteBuff buffer, FlowContext flowContext) { + int res = decoder.decode(buffer); + if (decoder.state() == DataDecoder.STATE_FINISHED) { + long now = System.currentTimeMillis(); + if (LOGGER.isDebugEnabled()) { + debug(LOGGER, flowContext, + "Response decoded (size: {} B, downloaded: {} ms). {}", + res, + now - flowContext.receivedStartTimestamp(), + decoder.getResult() + ); + } + } + + return res; + } + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataEncoder.java Thu Apr 23 14:17:44 2020 +0200 @@ -0,0 +1,18 @@ +package com.passus.st.client.mysql; + +import com.passus.data.ByteBuff; +import com.passus.net.mysql.MySqlEncoder; +import com.passus.net.mysql.MySqlPacket; +import com.passus.st.client.FlowContext; +import com.passus.st.client.FlowHandlerDataEncoder; + +public class MySqlFlowHandlerDataEncoder implements FlowHandlerDataEncoder<MySqlPacket> { + + private final MySqlEncoder encoder = new MySqlEncoder(); + + @Override + public void encode(MySqlPacket request, FlowContext flowContext, ByteBuff out) { + encoder.encode(request, out); + } +} +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlMetric.java Thu Apr 23 14:17:44 2020 +0200 @@ -0,0 +1,45 @@ +package com.passus.st.client.mysql; + +import com.passus.commons.metric.Metric; +import com.passus.st.client.DbMetric; +import org.apache.commons.lang3.mutable.MutableInt; + +import java.util.HashMap; + +public class MySqlMetric extends DbMetric { + + public static final String DEFAULT_NAME = "MySql"; + + private final HashMap<Integer, MutableInt> errorCodesCount = new HashMap<>(); + + public MySqlMetric() { + this(DEFAULT_NAME); + } + + public MySqlMetric(String name) { + super(name); + attrs.put("errorCodes", errorCodesCount); + } + + public void addErrorCode(int status) { + Metric.incrementCountMap(status, errorCodesCount); + } + + @Override + public void update(Metric metric) { + super.update(metric); + MySqlMetric dbMetric = (MySqlMetric) metric; + Metric.updateCountMap(errorCodesCount, dbMetric.errorCodesCount); + } + + @Override + public void reset() { + super.reset(); + errorCodesCount.clear(); + } + + @Override + public String extractStatementType(String query) { + return null; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcMySqlPayloadReader.java Thu Apr 23 14:17:44 2020 +0200 @@ -0,0 +1,41 @@ +package com.passus.st.reader.nc; + +import com.passus.data.ByteBuff; +import com.passus.data.DataDecoder; +import com.passus.net.mysql.MySqlDecoder; +import com.passus.net.mysql.MySqlPacket; +import com.passus.net.mysql.MySqlRequestDecoder; +import com.passus.net.mysql.MySqlResponseDecoder; +import com.passus.st.source.NcPayloadReader; + +import java.io.IOException; + +public class NcMySqlPayloadReader extends NcPayloadReader<MySqlPacket, MySqlPacket> { + + private final MySqlRequestDecoder decoderClient = new MySqlRequestDecoder(); + + private final MySqlResponseDecoder decoderServer = new MySqlResponseDecoder(); + + private MySqlPacket decode(ByteBuff payload, MySqlDecoder decoder) throws IOException { + try { + int res = decoder.decode(payload); + payload.skipBytes(res); + if (decoder.state() == DataDecoder.STATE_FINISHED) { + return decoder.getResult(); + } + throw new IOException("Invalid PgSql data."); + } finally { + decoder.clear(); + } + } + + @Override + protected MySqlPacket decodeRequest(ByteBuff payload) throws IOException { + return decode(payload, decoderClient); + } + + @Override + protected MySqlPacket decodeResponse(ByteBuff payload) throws IOException { + return decode(payload, decoderServer); + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcMySqlPayloadWriter.java Thu Apr 23 14:17:44 2020 +0200 @@ -0,0 +1,13 @@ +package com.passus.st.reader.nc; + +import com.passus.net.mysql.MySqlEncoder; +import com.passus.net.mysql.MySqlPacket; +import com.passus.st.source.AbstractNcPayloadWriter; + +public class NcMySqlPayloadWriter extends AbstractNcPayloadWriter<MySqlPacket, MySqlPacket> { + + public NcMySqlPayloadWriter() { + super(new MySqlEncoder(), new MySqlEncoder()); + } + +} \ No newline at end of file
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcPgSqlPayloadWriter.java Mon Apr 20 15:44:07 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcPgSqlPayloadWriter.java Thu Apr 23 14:17:44 2020 +0200 @@ -8,6 +8,6 @@ public NcPgSqlPayloadWriter() { super(new PgSqlEncoder(), new PgSqlEncoder()); + } - } } \ No newline at end of file
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java Mon Apr 20 15:44:07 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java Thu Apr 23 14:17:44 2020 +0200 @@ -39,6 +39,8 @@ private final NcPgSqlPayloadWriter pgSqlWriter = new NcPgSqlPayloadWriter(); + private final NcMySqlPayloadWriter mySqlWriter = new NcMySqlPayloadWriter(); + private boolean allowOverwrite; public NcEventDestination() { @@ -134,6 +136,9 @@ case PGSQL: pgSqlWriter.write(event, writer); break; + case MYSQL: + mySqlWriter.write(event, writer); + break; } }
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java Mon Apr 20 15:44:07 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java Thu Apr 23 14:17:44 2020 +0200 @@ -60,6 +60,8 @@ private NcPgSqlPayloadReader pgSqlReader = new NcPgSqlPayloadReader(); + private NcMySqlPayloadReader mySqlSqlReader = new NcMySqlPayloadReader(); + private String name = UniqueIdGenerator.generate(); private ReaderThread readerThread; @@ -258,6 +260,9 @@ case PGSQL: messages = pgSqlReader.read(payload); break; + case MYSQL: + messages = mySqlSqlReader.read(payload); + break; default: if (LOGGER.isDebugEnabled()) { LOGGER.debug("Not supported protocol {}.", payloadBlock.proto());
--- a/stress-tester/src/main/java/com/passus/st/utils/EventUtils.java Mon Apr 20 15:44:07 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/utils/EventUtils.java Thu Apr 23 14:17:44 2020 +0200 @@ -5,6 +5,7 @@ import com.passus.net.PortRangeSet; import com.passus.net.dns.session.DnsUdpSessionAnalyzer; import com.passus.net.http.session.HttpSessionAnalyzer; +import com.passus.net.mysql.MySqlTcpSessionAnalyzer; import com.passus.net.netflow.NetflowUdpSessionAnalyzer; import com.passus.net.pgsql.PgSqlTcpSessionAnalyzer; import com.passus.net.session.SessionAnalyzer; @@ -82,6 +83,12 @@ src.addAnalyzer(analyzer); } + if (protocols.contains(Protocols.MYSQL)) { + SessionAnalyzer analyzer = new MySqlTcpSessionAnalyzer(); + analyzer.getPortsRange().add(3306); + src.addAnalyzer(analyzer); + } + src.setLoops(1); ArrayListEventHandler handler = new ArrayListEventHandler();
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java Mon Apr 20 15:44:07 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java Thu Apr 23 14:17:44 2020 +0200 @@ -27,7 +27,7 @@ private FileEvents writeEvents(String pcapFile) throws IOException { PortRangeSet portRanges = new PortRangeSet(); - portRanges.add(4214).add(8080).add(53).add(2055).add(5432); + portRanges.add(4214).add(8080).add(53).add(2055).add(5432).add(3306); Map<String, Object> props = new HashMap<>(); props.put("allowPartialSession", true); props.put("ports", portRanges); @@ -53,7 +53,8 @@ {"pcap/http/http_1.pcap", HTTP}, {"pcap/dns/dns_A_req_resp.pcap", DNS}, {"pcap/netflow/netflow_v5.pcap", NETFLOW}, - {"pcap/pgsql/pgsql_session.pcap", PGSQL} + {"pcap/pgsql/pgsql_session.pcap", PGSQL}, + {"pcap/mysql/mysql_session.pcap", MYSQL} }; }