Mercurial > stress-tester
changeset 1098:0724cf025128
PacketsBulk added
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/PacketsBulk.java Tue May 12 10:02:17 2020 +0200 @@ -0,0 +1,17 @@ +package com.passus.st; + +import java.util.ArrayList; +import java.util.List; + +public class PacketsBulk<T> { + + public final int protocol; + + public final List<T> packets; + + public PacketsBulk(int protocol) { + this.protocol = protocol; + this.packets = new ArrayList<>(); + } + +}
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java Tue May 12 09:33:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java Tue May 12 10:02:17 2020 +0200 @@ -4,6 +4,7 @@ import com.passus.commons.time.TimeAware; import com.passus.commons.time.TimeGenerator; import com.passus.net.mysql.*; +import com.passus.st.PacketsBulk; import com.passus.st.client.*; import com.passus.st.client.pgsql.PgSqlFlowHandler; import org.apache.logging.log4j.LogManager; @@ -14,7 +15,7 @@ import static com.passus.net.mysql.MySqlPacketTypes.ROW_RESPONSE; import static com.passus.st.Protocols.NETFLOW; -public final class MySqlFlowHandler extends AbstractFlowHandler<MySqlMetric, MySqlPacket, List<MySqlPacket>> implements TimeAware { +public final class MySqlFlowHandler extends AbstractFlowHandler<MySqlMetric, MySqlPacket, PacketsBulk<MySqlPacket>> implements TimeAware { private final Logger LOGGER = LogManager.getLogger(PgSqlFlowHandler.class); @@ -32,7 +33,7 @@ } @Override - protected FlowHandlerDataDecoder<List<MySqlPacket>> createDecoder() { + protected FlowHandlerDataDecoder<PacketsBulk<MySqlPacket>> createDecoder() { return new MySqlFlowHandlerDataDecoder(context); } @@ -80,9 +81,9 @@ } @Override - protected void onResponseReceived0(List<MySqlPacket> resps, FlowContext flowContext) { - if (resps.size() == 1) { - MySqlPacket resp = resps.get(0); + protected void onResponseReceived0(PacketsBulk<MySqlPacket> bulk, FlowContext flowContext) { + if (bulk.packets.size() == 1) { + MySqlPacket resp = bulk.packets.get(0); if (resp.getPacketType() == MySqlPacketTypes.ERROR_RESPONSE) { MySqlErrorResponse errorRsp = (MySqlErrorResponse) resp; if (collectMetrics) { @@ -102,7 +103,7 @@ } } else { if (collectMetrics) { - for (MySqlPacket resp : resps) { + for (MySqlPacket resp : bulk.packets) { if (resp.getPacketType() == ROW_RESPONSE) { synchronized (metric) { metric.incRecordsNum();
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataDecoder.java Tue May 12 09:33:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataDecoder.java Tue May 12 10:02:17 2020 +0200 @@ -4,17 +4,15 @@ import com.passus.data.DataDecoder; import com.passus.net.mysql.MySqlDecoderContext; import com.passus.net.mysql.MySqlPacket; -import com.passus.net.mysql.MySqlResponseDecoder; +import com.passus.st.PacketsBulk; import com.passus.st.client.FlowContext; import com.passus.st.client.FlowHandlerDataDecoder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.List; - import static com.passus.st.client.FlowUtils.debug; -public class MySqlFlowHandlerDataDecoder implements FlowHandlerDataDecoder<List<MySqlPacket>> { +public class MySqlFlowHandlerDataDecoder implements FlowHandlerDataDecoder<PacketsBulk<MySqlPacket>> { private static final Logger LOGGER = LogManager.getLogger(MySqlFlowHandlerDataDecoder.class); @@ -25,7 +23,7 @@ } @Override - public List<MySqlPacket> getResult() { + public PacketsBulk<MySqlPacket> getResult() { return decoder.getResult(); }
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlResponsePacketsDecoder.java Tue May 12 09:33:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlResponsePacketsDecoder.java Tue May 12 10:02:17 2020 +0200 @@ -9,18 +9,18 @@ import com.passus.net.mysql.MySqlDecoderContext; import com.passus.net.mysql.MySqlPacket; import com.passus.net.mysql.MySqlResponseDecoder; +import com.passus.st.PacketsBulk; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.ArrayList; -import java.util.List; +import static com.passus.st.Protocols.MYSQL; -public class MySqlResponsePacketsDecoder extends AbstractDataDecoder<List<MySqlPacket>> { +public class MySqlResponsePacketsDecoder extends AbstractDataDecoder<PacketsBulk<MySqlPacket>> { private static final Logger LOGGER = LogManager.getLogger(MySqlResponsePacketsDecoder.class); - private final List<MySqlPacket> packets; + private final PacketsBulk<MySqlPacket> bulk; private final MySqlDecoderContext context; @@ -29,18 +29,18 @@ public MySqlResponsePacketsDecoder(MySqlDecoderContext context) { this.context = context; this.pdu = new MySqlPdu(context); - this.packets = new ArrayList<>(); + this.bulk = new PacketsBulk<>(MYSQL); } @Override - public List<MySqlPacket> getResult() { - return packets; + public PacketsBulk<MySqlPacket> getResult() { + return bulk; } @Override public void clear() { pdu.pdu.clear(); - packets.clear(); + bulk.packets.clear(); } @Override @@ -83,7 +83,7 @@ if (decoder.state() == MySqlDecoder.STATE_FINISHED) { MySqlPacket packet = decoder.getResult(); if (packet != null) { - packets.add(packet); + bulk.packets.add(packet); if (context.stage() == MySqlDecoderContext.STAGE_REQUEST || context.stage() == MySqlDecoderContext.STAGE_NONE) { state(STATE_FINISHED);
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlFilter.java Tue May 12 09:33:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlFilter.java Tue May 12 10:02:17 2020 +0200 @@ -1,34 +1,35 @@ package com.passus.st.client.mysql.filter; import com.passus.net.mysql.MySqlPacket; +import com.passus.st.PacketsBulk; import com.passus.st.client.FlowContext; import com.passus.st.filter.FlowFilter; -import java.util.List; +import static com.passus.st.Protocols.MYSQL; public abstract class MySqlFilter implements FlowFilter { @Override public int filterInbound(Object req, Object resp, FlowContext context) { - if (req instanceof MySqlPacket || resp instanceof List) { - return filterInbound((MySqlPacket) req, (List<MySqlPacket>) resp, context); + if (req instanceof MySqlPacket || (resp instanceof PacketsBulk && ((PacketsBulk) resp).protocol == MYSQL)) { + return filterInbound((MySqlPacket) req, (PacketsBulk<MySqlPacket>) resp, context); } return DUNNO; } - public int filterInbound(MySqlPacket req, List<MySqlPacket> resp, FlowContext context) { + public int filterInbound(MySqlPacket req, PacketsBulk<MySqlPacket> resp, FlowContext context) { return DUNNO; } @Override public int filterOutbound(Object req, Object resp, FlowContext context) { - if (req instanceof MySqlPacket || resp instanceof List) { - return filterOutbound((MySqlPacket) req, (List<MySqlPacket>) resp, context); + if (req instanceof MySqlPacket || (resp instanceof PacketsBulk && ((PacketsBulk) resp).protocol == MYSQL)) { + return filterOutbound((MySqlPacket) req, (PacketsBulk<MySqlPacket>) resp, context); } return DUNNO; } - public int filterOutbound(MySqlPacket req, List<MySqlPacket> resp, FlowContext context) { + public int filterOutbound(MySqlPacket req, PacketsBulk<MySqlPacket> resp, FlowContext context) { return DUNNO; }
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java Tue May 12 09:33:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java Tue May 12 10:02:17 2020 +0200 @@ -10,6 +10,7 @@ import com.passus.net.mysql.MySqlPacket; import com.passus.net.mysql.MySqlPacketTypes; import com.passus.net.mysql.MySqlServerGreetingResponse; +import com.passus.st.PacketsBulk; import com.passus.st.client.FlowContext; import com.passus.st.client.credentials.Credentials; import com.passus.st.client.credentials.CredentialsProvider; @@ -17,8 +18,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.List; - import static com.passus.config.schema.ConfigurationSchemaBuilder.*; import static com.passus.net.mysql.MySqlUtils.passScramble411; import static com.passus.st.filter.CredentialsProviderNodeDefinition.providersNodeDef; @@ -52,9 +51,9 @@ } @Override - public int filterInbound(MySqlPacket req, List<MySqlPacket> resps, FlowContext context) { - if (resps.size() == 1) { - MySqlPacket resp = resps.get(0); + public int filterInbound(MySqlPacket req, PacketsBulk<MySqlPacket> bulk, FlowContext context) { + if (bulk.packets.size() == 1) { + MySqlPacket resp = bulk.packets.get(0); if (resp.getPacketType() == MySqlPacketTypes.GREETINGS) { context.setParam("mysql.greeting", resp); } @@ -93,7 +92,7 @@ } @Override - public int filterOutbound(MySqlPacket req, List<MySqlPacket> resp, FlowContext context) { + public int filterOutbound(MySqlPacket req, PacketsBulk<MySqlPacket> bulk, FlowContext context) { if (req.getPacketType() == MySqlPacketTypes.LOGIN_REQUEST) { MySqlServerGreetingResponse greeting = context.getParamValue("mysql.greeting"); if (greeting != null) {
--- a/stress-tester/src/test/java/com/passus/st/client/mysql/MySqlResponsePacketsDecoderTest.java Tue May 12 09:33:59 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/mysql/MySqlResponsePacketsDecoderTest.java Tue May 12 10:02:17 2020 +0200 @@ -3,6 +3,7 @@ import com.passus.net.mysql.*; import com.passus.net.packet.Tcp; import com.passus.net.source.pcap.PcapUtils; +import com.passus.st.PacketsBulk; import org.testng.annotations.Test; import java.util.Iterator; @@ -28,9 +29,9 @@ respDecoder.decode(it.next().getPayload()); assertEquals(STATE_FINISHED, respDecoder.state()); - List<MySqlPacket> packets = respDecoder.getResult(); - assertEquals(1, packets.size()); - assertTrue(packets.get(0) instanceof MySqlServerGreetingResponse); + PacketsBulk<MySqlPacket> bulk = respDecoder.getResult(); + assertEquals(1, bulk.packets.size()); + assertTrue(bulk.packets.get(0) instanceof MySqlServerGreetingResponse); reqDecoder.decode(it.next().getPayload()); assertEquals(STATE_FINISHED, reqDecoder.state()); @@ -80,9 +81,9 @@ respDecoder.decode(it.next().getPayload()); assertEquals(STATE_FINISHED, respDecoder.state()); - List<MySqlPacket> packets = respDecoder.getResult(); - assertEquals(6, packets.stream().filter(p -> p instanceof MySqlColumnDefinition).count()); - assertEquals(1000, packets.stream().filter(p -> p instanceof MySqlRowResponse).count()); + PacketsBulk<MySqlPacket> bulk = respDecoder.getResult(); + assertEquals(6, bulk.packets.stream().filter(p -> p instanceof MySqlColumnDefinition).count()); + assertEquals(1000, bulk.packets.stream().filter(p -> p instanceof MySqlRowResponse).count()); reqDecoder.decode(it.next().getPayload()); assertEquals(STATE_FINISHED, reqDecoder.state()); @@ -103,8 +104,8 @@ respDecoder.decode(it.next().getPayload()); assertEquals(STATE_FINISHED, respDecoder.state()); - List<MySqlPacket> packets = respDecoder.getResult(); - assertEquals(5, packets.size()); - assertEquals(4, packets.stream().filter(p -> p instanceof MySqlColumnDefinition).count()); + PacketsBulk<MySqlPacket> bulk = respDecoder.getResult(); + assertEquals(5, bulk.packets.size()); + assertEquals(4, bulk.packets.stream().filter(p -> p instanceof MySqlColumnDefinition).count()); } } \ No newline at end of file