Mercurial > stress-tester
changeset 1088:dd193de63d88
AbstractFlowHandler - initialization changed
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java Thu May 07 13:56:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java Fri May 08 09:45:09 2020 +0200 @@ -5,16 +5,27 @@ public abstract class AbstractFlowHandler<T extends Metric, R, S> implements FlowHandler { - private final FlowHandlerDataEncoder<R> encoder; - private final FlowHandlerDataDecoder<S> decoder; + private FlowHandlerDataEncoder<R> encoder; + private FlowHandlerDataDecoder<S> decoder; protected boolean collectMetrics = DEFAULT_COLLECT_METRICS; protected T metric; - public AbstractFlowHandler(FlowHandlerDataEncoder<R> encoder, FlowHandlerDataDecoder<S> decoder) { - this.encoder = encoder; - this.decoder = decoder; + protected abstract FlowHandlerDataEncoder<R> createEncoder(); + + protected abstract FlowHandlerDataDecoder<S> createDecoder(); + + @Override + public void init(FlowContext flowContext) { + decoder = createDecoder(); + encoder = createEncoder(); + if (collectMetrics) { + metric = createMetric(); + synchronized (metric) { + metric.activate(); + } + } } @Override @@ -37,17 +48,6 @@ @Override public void setCollectMetrics(boolean collectMetrics) { this.collectMetrics = collectMetrics; - if (collectMetrics) { - metric = createMetric(); - synchronized (metric) { - metric.activate(); - } - } else if (metric != null) { - synchronized (metric) { - metric.deactivate(); - } - metric = null; - } } @Override
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Thu May 07 13:56:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Fri May 08 09:45:09 2020 +0200 @@ -114,10 +114,10 @@ FlowContext flowContext = new FlowContext(session, new LinkedList<>()); flowContext.createLock(); - FlowHandler client = clientFactory.create(session.getProtocolId()); - client.init(flowContext); - client.setCollectMetrics(collectMetrics); - flowContext.client(client); + FlowHandler flowHandler = clientFactory.create(session.getProtocolId()); + flowHandler.init(flowContext); + flowHandler.setCollectMetrics(collectMetrics); + flowContext.client(flowHandler); sessions.put(session, flowContext); return flowContext; }
--- a/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java Thu May 07 13:56:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java Fri May 08 09:45:09 2020 +0200 @@ -7,6 +7,8 @@ import com.passus.net.dns.DnsQuery; import com.passus.st.client.AbstractFlowHandler; import com.passus.st.client.FlowContext; +import com.passus.st.client.FlowHandlerDataDecoder; +import com.passus.st.client.FlowHandlerDataEncoder; import static com.passus.st.Protocols.DNS; @@ -16,10 +18,14 @@ boolean collectMetrics = DEFAULT_COLLECT_METRICS; - DnsMetric metric; + @Override + protected FlowHandlerDataEncoder<Dns> createEncoder() { + return new DnsFlowHandlerDataEncoder(); + } - public DnsFlowHandler() { - super(new DnsFlowHandlerDataEncoder(), new DnsFlowHandlerDataDecoder()); + @Override + protected FlowHandlerDataDecoder<Dns> createDecoder() { + return new DnsFlowHandlerDataDecoder(); } @Override
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java Thu May 07 13:56:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java Fri May 08 09:45:09 2020 +0200 @@ -3,27 +3,30 @@ import com.passus.commons.Assert; import com.passus.commons.time.TimeAware; import com.passus.commons.time.TimeGenerator; -import com.passus.net.mysql.MySqlErrorResponse; -import com.passus.net.mysql.MySqlPacket; -import com.passus.net.mysql.MySqlPacketTypes; -import com.passus.net.mysql.MySqlQueryCommand; -import com.passus.st.client.AbstractFlowHandler; -import com.passus.st.client.FlowContext; -import com.passus.st.client.WaitForResponseEvent; +import com.passus.net.mysql.*; +import com.passus.st.client.*; import com.passus.st.client.pgsql.PgSqlFlowHandler; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import static com.passus.st.Protocols.NETFLOW; -public class MySqlFlowHandler extends AbstractFlowHandler<MySqlMetric, MySqlPacket, MySqlPacket> implements TimeAware { +public final class MySqlFlowHandler extends AbstractFlowHandler<MySqlMetric, MySqlPacket, MySqlPacket> implements TimeAware { private final Logger LOGGER = LogManager.getLogger(PgSqlFlowHandler.class); TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); - public MySqlFlowHandler() { - super(new MySqlFlowHandlerDataEncoder(), new MySqlFlowHandlerDataDecoder()); + private final MySqlDecoderContext context = new MySqlDecoderContext(); + + @Override + protected FlowHandlerDataEncoder<MySqlPacket> createEncoder() { + return new MySqlFlowHandlerDataEncoder(context); + } + + @Override + protected FlowHandlerDataDecoder<MySqlPacket> createDecoder() { + return new MySqlFlowHandlerDataDecoder(context); } @Override
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataDecoder.java Thu May 07 13:56:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataDecoder.java Fri May 08 09:45:09 2020 +0200 @@ -2,6 +2,7 @@ import com.passus.data.ByteBuff; import com.passus.data.DataDecoder; +import com.passus.net.mysql.MySqlDecoderContext; import com.passus.net.mysql.MySqlPacket; import com.passus.net.mysql.MySqlResponseDecoder; import com.passus.st.client.FlowContext; @@ -15,10 +16,10 @@ private static final Logger LOGGER = LogManager.getLogger(MySqlFlowHandlerDataDecoder.class); - private MySqlResponseDecoder decoder = new MySqlResponseDecoder(); + private final MySqlResponseDecoder decoder; - public MySqlFlowHandlerDataDecoder() { - + public MySqlFlowHandlerDataDecoder(MySqlDecoderContext context) { + decoder = new MySqlResponseDecoder(context); } @Override
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataEncoder.java Thu May 07 13:56:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataEncoder.java Fri May 08 09:45:09 2020 +0200 @@ -1,6 +1,7 @@ package com.passus.st.client.mysql; import com.passus.data.ByteBuff; +import com.passus.net.mysql.MySqlDecoderContext; import com.passus.net.mysql.MySqlEncoder; import com.passus.net.mysql.MySqlPacket; import com.passus.st.client.FlowContext; @@ -8,7 +9,14 @@ public class MySqlFlowHandlerDataEncoder implements FlowHandlerDataEncoder<MySqlPacket> { - private final MySqlEncoder encoder = new MySqlEncoder(); + private final MySqlEncoder encoder; + + private final MySqlDecoderContext context; + + public MySqlFlowHandlerDataEncoder(MySqlDecoderContext context) { + this.encoder = new MySqlEncoder(); + this.context = context; + } @Override public void encode(MySqlPacket request, FlowContext flowContext, ByteBuff out) {
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java Thu May 07 13:56:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java Fri May 08 09:45:09 2020 +0200 @@ -82,9 +82,11 @@ return; } +/* loginReq.setClientCapabilities(0x0000a207); + loginReq.setExtendedClientCapabilities(0x0000013e);*/ loginReq.setUsername(credentials.getUser()); loginReq.setAuthPlugin(authPlugin); - loginReq.setAttributes(null); + //loginReq.setAttributes(null); } }
--- a/stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandler.java Thu May 07 13:56:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandler.java Fri May 08 09:45:09 2020 +0200 @@ -6,6 +6,8 @@ import com.passus.net.netflow.Netflow; import com.passus.st.client.AbstractFlowHandler; import com.passus.st.client.FlowContext; +import com.passus.st.client.FlowHandlerDataDecoder; +import com.passus.st.client.FlowHandlerDataEncoder; import static com.passus.st.Protocols.NETFLOW; @@ -13,8 +15,14 @@ TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); - public NetflowFlowHandler() { - super(new NetflowFlowHandlerDataEncoder(), null); + @Override + protected FlowHandlerDataEncoder<Netflow> createEncoder() { + return new NetflowFlowHandlerDataEncoder(); + } + + @Override + protected FlowHandlerDataDecoder<Void> createDecoder() { + return null; } @Override @@ -40,6 +48,7 @@ @Override public void init(FlowContext flowContext) { + super.init(flowContext); flowContext.setBidirectional(false); }
--- a/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandler.java Thu May 07 13:56:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandler.java Fri May 08 09:45:09 2020 +0200 @@ -9,6 +9,8 @@ import com.passus.net.pgsql.PgSqlSimpleQueryMessage; import com.passus.st.client.AbstractFlowHandler; import com.passus.st.client.FlowContext; +import com.passus.st.client.FlowHandlerDataDecoder; +import com.passus.st.client.FlowHandlerDataEncoder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -20,8 +22,14 @@ TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); - public PgSqlFlowHandler() { - super(new PgSqlFlowHandlerDataEncoder(), new PgSqlFlowHandlerDataDecoder()); + @Override + protected FlowHandlerDataEncoder<PgSqlMessage> createEncoder() { + return new PgSqlFlowHandlerDataEncoder(); + } + + @Override + protected FlowHandlerDataDecoder<PgSqlMessage> createDecoder() { + return new PgSqlFlowHandlerDataDecoder(); } @Override
--- a/stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java Thu May 07 13:56:28 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java Fri May 08 09:45:09 2020 +0200 @@ -8,6 +8,7 @@ import com.passus.net.dns.DnsARecord; import com.passus.net.dns.DnsBuilder; import com.passus.net.dns.DnsEncoder; +import com.passus.st.Log4jConfigurationFactory; import com.passus.st.Protocols; import com.passus.st.client.Event; import com.passus.st.client.FlowExecutor;
--- a/stress-tester/src/test/java/com/passus/st/utils/NioServer.java Thu May 07 13:56:28 2020 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,139 +0,0 @@ -package com.passus.st.utils; - -import com.passus.commons.service.Service; -import com.passus.commons.service.ServiceException; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.*; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class NioServer implements Service { - - private static final Logger LOGGER = LogManager.getLogger(NioServer.class); - - private String address = "localhost"; - - private int port = 5000; - - private ChannelFuture channel; - - private EventLoopGroup masterGroup; - - private EventLoopGroup slaveGroup; - - private final NioConnectionsCounter connectionsCounter = new NioConnectionsCounter(); - - private ChannelHandler serverHandler; - - private boolean started; - - public String getAddress() { - return address; - } - - public void setAddress(String address) { - this.address = address; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public ChannelHandler getServerHandler() { - return serverHandler; - } - - public void setServerHandler(ChannelHandler serverHandler) { - this.serverHandler = serverHandler; - } - - public int getConnections() { - return connectionsCounter.getConnections(); - } - - public int getMaxConnections() { - return connectionsCounter.getMaxConnections(); - } - - public void setMaxConnections(int maxConnections) { - connectionsCounter.setMaxConnections(maxConnections); - } - - @Override - public boolean isStarted() { - return started; - } - - @Override - public void start() { - if (started) { - return; - } - - try { - masterGroup = new NioEventLoopGroup(); - slaveGroup = new NioEventLoopGroup(); - ServerBootstrap bs = new ServerBootstrap(); - bs.option(ChannelOption.SO_BACKLOG, 1024); - - bs.group(masterGroup, slaveGroup) - .channel(NioServerSocketChannel.class) - .childHandler(new ChannelInitializer<SocketChannel>() { - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - LOGGER.warn("Failed to initialize a channel. Closing: " + ctx.channel(), cause); - ctx.close(); - } - - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline p = ch.pipeline(); - p.addLast(connectionsCounter); - p.addLast(serverHandler); - } - - }); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Starting Netty server {}:{}.", address, port); - } - - channel = bs.bind(address, port); - this.started = true; - } catch (Exception ex) { - LOGGER.error(ex.getMessage(), ex); - stop(); - throw new ServiceException(ex.getMessage(), ex); - } - } - - @Override - public void stop() { - if (!started) { - return; - } - - slaveGroup.shutdownGracefully(); - masterGroup.shutdownGracefully(); - - try { - LOGGER.debug("Stopping Netty server {}:{}.", address, port); - channel.channel().closeFuture().sync(); - } catch (InterruptedException ignore) { - } - - slaveGroup = null; - masterGroup = null; - channel = null; - - started = false; - } -}