Mercurial > stress-tester
changeset 1058:8535ce392b5e
AbstractFlowHandler enhanced
author | Devel 2 |
---|---|
date | Thu, 16 Apr 2020 09:41:15 +0200 |
parents | b3d44dd719f1 |
children | 1d47e67bad78 |
files | stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandlerDataDecoder.java stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandler.java stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java |
diffstat | 5 files changed, 52 insertions(+), 88 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java Thu Apr 16 09:20:53 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java Thu Apr 16 09:41:15 2020 +0200 @@ -3,12 +3,30 @@ import com.passus.commons.metric.Metric; import com.passus.st.metric.MetricsContainer; -public abstract class AbstractFlowHandler<T extends Metric> implements FlowHandler { +public abstract class AbstractFlowHandler<T extends Metric, R, S> implements FlowHandler { + + private final FlowHandlerDataEncoder<R> encoder; + private final FlowHandlerDataDecoder<S> decoder; protected boolean collectMetrics = false; protected T metric; + public AbstractFlowHandler(FlowHandlerDataEncoder<R> encoder, FlowHandlerDataDecoder<S> decoder) { + this.encoder = encoder; + this.decoder = decoder; + } + + @Override + public FlowHandlerDataDecoder<S> getResponseDecoder(FlowContext flowContext) { + return decoder; + } + + @Override + public FlowHandlerDataEncoder<R> getRequestEncoder(FlowContext flowContext) { + return encoder; + } + protected abstract T createMetric(); @Override @@ -41,5 +59,20 @@ } } } - + + @Override + public void onRequestSent(Object request, FlowContext flowContext) { + onRequestSent0((R) request, flowContext); + } + + protected void onRequestSent0(R request, FlowContext flowContext) { + } + + @Override + public void onResponseReceived(Object response, FlowContext flowContext) { + onResponseReceived0((S) response, flowContext); + } + + protected void onResponseReceived0(S response, FlowContext flowContext) { + } }
--- a/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java Thu Apr 16 09:20:53 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java Thu Apr 16 09:41:15 2020 +0200 @@ -7,16 +7,10 @@ import com.passus.net.dns.DnsQuery; import com.passus.st.client.AbstractFlowHandler; import com.passus.st.client.FlowContext; -import com.passus.st.client.FlowHandlerDataDecoder; -import com.passus.st.client.FlowHandlerDataEncoder; import static com.passus.st.Protocols.DNS; -public class DnsFlowHandler extends AbstractFlowHandler<DnsMetric> implements TimeAware { - - private final DnsFlowHandlerDataDecoder decoder; - - private final DnsFlowHandlerDataEncoder encoder; +public class DnsFlowHandler extends AbstractFlowHandler<DnsMetric, Dns, Dns> implements TimeAware { TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); @@ -25,8 +19,7 @@ DnsMetric metric; public DnsFlowHandler() { - this.decoder = new DnsFlowHandlerDataDecoder(this); - this.encoder = new DnsFlowHandlerDataEncoder(); + super(new DnsFlowHandlerDataEncoder(), new DnsFlowHandlerDataDecoder()); } @Override @@ -51,20 +44,9 @@ } @Override - public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) { - return decoder; - } - - @Override - public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) { - return encoder; - } - - @Override - public void onRequestSent(Object request, FlowContext flowContext) { + protected void onRequestSent0(Dns dns, FlowContext flowContext) { if (collectMetrics) { synchronized (metric) { - Dns dns = (Dns) request; if (dns.getQueriesCount() > 0) { DnsQuery query = dns.getQueries().get(0); metric.addSentRecordType(query.getType()); @@ -74,10 +56,9 @@ } @Override - public void onResponseReceived(Object response, FlowContext flowContext) { + protected void onResponseReceived0(Dns dns, FlowContext flowContext) { if (collectMetrics) { synchronized (metric) { - Dns dns = (Dns) response; metric.addReplyCode(dns.getReplyCode()); if (dns.getQueriesCount() > 0) { dns.getAnswers().forEach((r) -> {
--- a/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandlerDataDecoder.java Thu Apr 16 09:20:53 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandlerDataDecoder.java Thu Apr 16 09:41:15 2020 +0200 @@ -16,13 +16,7 @@ private static final Logger LOGGER = LogManager.getLogger(DnsFlowHandlerDataDecoder.class); private DnsDecoder decoder = new DnsDecoder(); - - private final DnsFlowHandler handler; - - public DnsFlowHandlerDataDecoder(DnsFlowHandler handler) { - this.handler = handler; - } - + @Override public Dns getResult() { return decoder.getResult(); @@ -42,7 +36,7 @@ public int decode(ByteBuff buffer, FlowContext flowContext) { int res = decoder.decode(buffer); if (decoder.state() == DataDecoder.STATE_FINISHED) { - long now = handler.timeGenerator.currentTimeMillis(); + long now = System.currentTimeMillis(); if (LOGGER.isDebugEnabled()) { debug(LOGGER, flowContext, @@ -51,19 +45,6 @@ now - flowContext.receivedStartTimestamp() ); } - - //TODO Dorobic metryki - /*if (client.isCollectMetrics()) { - synchronized (client.metric) { - client.metric.incResponsesNum(); - client.metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize()); - client.metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp()); - if (lastRequest != null) { - client.metric.addResponseTime(now - (long) lastRequest.getTag(TAG_TIME_START)); - } - } - }*/ - } return res;
--- a/stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandler.java Thu Apr 16 09:20:53 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandler.java Thu Apr 16 09:41:15 2020 +0200 @@ -6,19 +6,15 @@ import com.passus.net.netflow.Netflow; import com.passus.st.client.AbstractFlowHandler; import com.passus.st.client.FlowContext; -import com.passus.st.client.FlowHandlerDataDecoder; -import com.passus.st.client.FlowHandlerDataEncoder; import static com.passus.st.Protocols.NETFLOW; -public class NetflowFlowHandler extends AbstractFlowHandler<NetflowMetric> implements TimeAware { - - private final NetflowFlowHandlerDataEncoder encoder; +public class NetflowFlowHandler extends AbstractFlowHandler<NetflowMetric, Netflow, Void> implements TimeAware { TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); public NetflowFlowHandler() { - this.encoder = new NetflowFlowHandlerDataEncoder(); + super(new NetflowFlowHandlerDataEncoder(), null); } @Override @@ -32,16 +28,6 @@ } @Override - public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) { - return null; - } - - @Override - public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) { - return encoder; - } - - @Override public TimeGenerator getTimeGenerator() { return timeGenerator; } @@ -58,9 +44,8 @@ } @Override - public void onRequestSent(Object request, FlowContext flowContext) { + protected void onRequestSent0(Netflow netflow, FlowContext flowContext) { if (collectMetrics) { - Netflow netflow = (Netflow) request; synchronized (metric) { metric.addNetflow(netflow); }
--- a/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java Thu Apr 16 09:20:53 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java Thu Apr 16 09:41:15 2020 +0200 @@ -8,25 +8,15 @@ import com.passus.net.pgsql.PgSqlSimpleQueryMessage; import com.passus.st.client.AbstractFlowHandler; import com.passus.st.client.FlowContext; -import com.passus.st.client.FlowHandlerDataDecoder; -import com.passus.st.client.FlowHandlerDataEncoder; import static com.passus.st.Protocols.NETFLOW; -public class PgSqlFlowHandlerNetflowFlowHandler extends AbstractFlowHandler<PgSqlMetric> implements TimeAware { - - private final PgSqlFlowHandlerDataEncoder encoder; - private final PgSqlFlowHandlerDataDecoder decoder; +public class PgSqlFlowHandlerNetflowFlowHandler extends AbstractFlowHandler<PgSqlMetric, PgSqlMessage, PgSqlMessage> implements TimeAware { TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); - boolean collectMetrics = false; - - private PgSqlMetric metric; - public PgSqlFlowHandlerNetflowFlowHandler() { - this.encoder = new PgSqlFlowHandlerDataEncoder(); - this.decoder = new PgSqlFlowHandlerDataDecoder(); + super(new PgSqlFlowHandlerDataEncoder(), new PgSqlFlowHandlerDataDecoder()); } @Override @@ -40,18 +30,7 @@ } @Override - public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) { - return decoder; - } - - @Override - public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) { - return encoder; - } - - @Override - public void onRequestSent(Object request, FlowContext flowContext) { - PgSqlMessage msg = (PgSqlMessage) request; + protected void onRequestSent0(PgSqlMessage msg, FlowContext flowContext) { if (collectMetrics) { if (msg.getType() == PgSqlMessageType.SIMPLE_QUERY) { PgSqlSimpleQueryMessage simpleQueryMsg = (PgSqlSimpleQueryMessage) msg; @@ -63,6 +42,11 @@ } @Override + protected void onResponseReceived0(PgSqlMessage msg, FlowContext flowContext) { + //flowContext.channelContext().close(); + } + + @Override public TimeGenerator getTimeGenerator() { return timeGenerator; }