changeset 1088:dd193de63d88

AbstractFlowHandler - initialization changed
author Devel 2
date Fri, 08 May 2020 09:45:09 +0200
parents 2600ca3bdfd6
children b1c79edc6d5e
files stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataDecoder.java stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataEncoder.java stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandler.java stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandler.java stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java stress-tester/src/test/java/com/passus/st/utils/NioServer.java
diffstat 11 files changed, 80 insertions(+), 181 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java	Thu May 07 13:56:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/AbstractFlowHandler.java	Fri May 08 09:45:09 2020 +0200
@@ -5,16 +5,27 @@
 
 public abstract class AbstractFlowHandler<T extends Metric, R, S> implements FlowHandler {
 
-    private final FlowHandlerDataEncoder<R> encoder;
-    private final FlowHandlerDataDecoder<S> decoder;
+    private FlowHandlerDataEncoder<R> encoder;
+    private FlowHandlerDataDecoder<S> decoder;
 
     protected boolean collectMetrics = DEFAULT_COLLECT_METRICS;
 
     protected T metric;
 
-    public AbstractFlowHandler(FlowHandlerDataEncoder<R> encoder, FlowHandlerDataDecoder<S> decoder) {
-        this.encoder = encoder;
-        this.decoder = decoder;
+    protected abstract FlowHandlerDataEncoder<R> createEncoder();
+
+    protected abstract FlowHandlerDataDecoder<S> createDecoder();
+
+    @Override
+    public void init(FlowContext flowContext) {
+        decoder = createDecoder();
+        encoder = createEncoder();
+        if (collectMetrics) {
+            metric = createMetric();
+            synchronized (metric) {
+                metric.activate();
+            }
+        }
     }
 
     @Override
@@ -37,17 +48,6 @@
     @Override
     public void setCollectMetrics(boolean collectMetrics) {
         this.collectMetrics = collectMetrics;
-        if (collectMetrics) {
-            metric = createMetric();
-            synchronized (metric) {
-                metric.activate();
-            }
-        } else if (metric != null) {
-            synchronized (metric) {
-                metric.deactivate();
-            }
-            metric = null;
-        }
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Thu May 07 13:56:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Fri May 08 09:45:09 2020 +0200
@@ -114,10 +114,10 @@
 
         FlowContext flowContext = new FlowContext(session, new LinkedList<>());
         flowContext.createLock();
-        FlowHandler client = clientFactory.create(session.getProtocolId());
-        client.init(flowContext);
-        client.setCollectMetrics(collectMetrics);
-        flowContext.client(client);
+        FlowHandler flowHandler = clientFactory.create(session.getProtocolId());
+        flowHandler.init(flowContext);
+        flowHandler.setCollectMetrics(collectMetrics);
+        flowContext.client(flowHandler);
         sessions.put(session, flowContext);
         return flowContext;
     }
--- a/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java	Thu May 07 13:56:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/dns/DnsFlowHandler.java	Fri May 08 09:45:09 2020 +0200
@@ -7,6 +7,8 @@
 import com.passus.net.dns.DnsQuery;
 import com.passus.st.client.AbstractFlowHandler;
 import com.passus.st.client.FlowContext;
+import com.passus.st.client.FlowHandlerDataDecoder;
+import com.passus.st.client.FlowHandlerDataEncoder;
 
 import static com.passus.st.Protocols.DNS;
 
@@ -16,10 +18,14 @@
 
     boolean collectMetrics = DEFAULT_COLLECT_METRICS;
 
-    DnsMetric metric;
+    @Override
+    protected FlowHandlerDataEncoder<Dns> createEncoder() {
+        return new DnsFlowHandlerDataEncoder();
+    }
 
-    public DnsFlowHandler() {
-        super(new DnsFlowHandlerDataEncoder(), new DnsFlowHandlerDataDecoder());
+    @Override
+    protected FlowHandlerDataDecoder<Dns> createDecoder() {
+        return new DnsFlowHandlerDataDecoder();
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java	Thu May 07 13:56:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java	Fri May 08 09:45:09 2020 +0200
@@ -3,27 +3,30 @@
 import com.passus.commons.Assert;
 import com.passus.commons.time.TimeAware;
 import com.passus.commons.time.TimeGenerator;
-import com.passus.net.mysql.MySqlErrorResponse;
-import com.passus.net.mysql.MySqlPacket;
-import com.passus.net.mysql.MySqlPacketTypes;
-import com.passus.net.mysql.MySqlQueryCommand;
-import com.passus.st.client.AbstractFlowHandler;
-import com.passus.st.client.FlowContext;
-import com.passus.st.client.WaitForResponseEvent;
+import com.passus.net.mysql.*;
+import com.passus.st.client.*;
 import com.passus.st.client.pgsql.PgSqlFlowHandler;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import static com.passus.st.Protocols.NETFLOW;
 
-public class MySqlFlowHandler extends AbstractFlowHandler<MySqlMetric, MySqlPacket, MySqlPacket> implements TimeAware {
+public final class MySqlFlowHandler extends AbstractFlowHandler<MySqlMetric, MySqlPacket, MySqlPacket> implements TimeAware {
 
     private final Logger LOGGER = LogManager.getLogger(PgSqlFlowHandler.class);
 
     TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
 
-    public MySqlFlowHandler() {
-        super(new MySqlFlowHandlerDataEncoder(), new MySqlFlowHandlerDataDecoder());
+    private final MySqlDecoderContext context = new MySqlDecoderContext();
+
+    @Override
+    protected FlowHandlerDataEncoder<MySqlPacket> createEncoder() {
+        return new MySqlFlowHandlerDataEncoder(context);
+    }
+
+    @Override
+    protected FlowHandlerDataDecoder<MySqlPacket> createDecoder() {
+        return new MySqlFlowHandlerDataDecoder(context);
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataDecoder.java	Thu May 07 13:56:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataDecoder.java	Fri May 08 09:45:09 2020 +0200
@@ -2,6 +2,7 @@
 
 import com.passus.data.ByteBuff;
 import com.passus.data.DataDecoder;
+import com.passus.net.mysql.MySqlDecoderContext;
 import com.passus.net.mysql.MySqlPacket;
 import com.passus.net.mysql.MySqlResponseDecoder;
 import com.passus.st.client.FlowContext;
@@ -15,10 +16,10 @@
 
     private static final Logger LOGGER = LogManager.getLogger(MySqlFlowHandlerDataDecoder.class);
 
-    private MySqlResponseDecoder decoder = new MySqlResponseDecoder();
+    private final MySqlResponseDecoder decoder;
 
-    public MySqlFlowHandlerDataDecoder() {
-
+    public MySqlFlowHandlerDataDecoder(MySqlDecoderContext context) {
+        decoder = new MySqlResponseDecoder(context);
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataEncoder.java	Thu May 07 13:56:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandlerDataEncoder.java	Fri May 08 09:45:09 2020 +0200
@@ -1,6 +1,7 @@
 package com.passus.st.client.mysql;
 
 import com.passus.data.ByteBuff;
+import com.passus.net.mysql.MySqlDecoderContext;
 import com.passus.net.mysql.MySqlEncoder;
 import com.passus.net.mysql.MySqlPacket;
 import com.passus.st.client.FlowContext;
@@ -8,7 +9,14 @@
 
 public class MySqlFlowHandlerDataEncoder implements FlowHandlerDataEncoder<MySqlPacket> {
 
-    private final MySqlEncoder encoder = new MySqlEncoder();
+    private final MySqlEncoder encoder;
+
+    private final MySqlDecoderContext context;
+
+    public MySqlFlowHandlerDataEncoder(MySqlDecoderContext context) {
+        this.encoder = new MySqlEncoder();
+        this.context = context;
+    }
 
     @Override
     public void encode(MySqlPacket request, FlowContext flowContext, ByteBuff out) {
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java	Thu May 07 13:56:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java	Fri May 08 09:45:09 2020 +0200
@@ -82,9 +82,11 @@
                     return;
             }
 
+/*            loginReq.setClientCapabilities(0x0000a207);
+            loginReq.setExtendedClientCapabilities(0x0000013e);*/
             loginReq.setUsername(credentials.getUser());
             loginReq.setAuthPlugin(authPlugin);
-            loginReq.setAttributes(null);
+            //loginReq.setAttributes(null);
         }
     }
 
--- a/stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandler.java	Thu May 07 13:56:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandler.java	Fri May 08 09:45:09 2020 +0200
@@ -6,6 +6,8 @@
 import com.passus.net.netflow.Netflow;
 import com.passus.st.client.AbstractFlowHandler;
 import com.passus.st.client.FlowContext;
+import com.passus.st.client.FlowHandlerDataDecoder;
+import com.passus.st.client.FlowHandlerDataEncoder;
 
 import static com.passus.st.Protocols.NETFLOW;
 
@@ -13,8 +15,14 @@
 
     TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
 
-    public NetflowFlowHandler() {
-        super(new NetflowFlowHandlerDataEncoder(), null);
+    @Override
+    protected FlowHandlerDataEncoder<Netflow> createEncoder() {
+        return new NetflowFlowHandlerDataEncoder();
+    }
+
+    @Override
+    protected FlowHandlerDataDecoder<Void> createDecoder() {
+        return null;
     }
 
     @Override
@@ -40,6 +48,7 @@
 
     @Override
     public void init(FlowContext flowContext) {
+        super.init(flowContext);
         flowContext.setBidirectional(false);
     }
 
--- a/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandler.java	Thu May 07 13:56:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandler.java	Fri May 08 09:45:09 2020 +0200
@@ -9,6 +9,8 @@
 import com.passus.net.pgsql.PgSqlSimpleQueryMessage;
 import com.passus.st.client.AbstractFlowHandler;
 import com.passus.st.client.FlowContext;
+import com.passus.st.client.FlowHandlerDataDecoder;
+import com.passus.st.client.FlowHandlerDataEncoder;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -20,8 +22,14 @@
 
     TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
 
-    public PgSqlFlowHandler() {
-        super(new PgSqlFlowHandlerDataEncoder(), new PgSqlFlowHandlerDataDecoder());
+    @Override
+    protected FlowHandlerDataEncoder<PgSqlMessage> createEncoder() {
+        return new PgSqlFlowHandlerDataEncoder();
+    }
+
+    @Override
+    protected FlowHandlerDataDecoder<PgSqlMessage> createDecoder() {
+        return new PgSqlFlowHandlerDataDecoder();
     }
 
     @Override
--- a/stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java	Thu May 07 13:56:28 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java	Fri May 08 09:45:09 2020 +0200
@@ -8,6 +8,7 @@
 import com.passus.net.dns.DnsARecord;
 import com.passus.net.dns.DnsBuilder;
 import com.passus.net.dns.DnsEncoder;
+import com.passus.st.Log4jConfigurationFactory;
 import com.passus.st.Protocols;
 import com.passus.st.client.Event;
 import com.passus.st.client.FlowExecutor;
--- a/stress-tester/src/test/java/com/passus/st/utils/NioServer.java	Thu May 07 13:56:28 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,139 +0,0 @@
-package com.passus.st.utils;
-
-import com.passus.commons.service.Service;
-import com.passus.commons.service.ServiceException;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.*;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class NioServer implements Service {
-
-    private static final Logger LOGGER = LogManager.getLogger(NioServer.class);
-
-    private String address = "localhost";
-
-    private int port = 5000;
-
-    private ChannelFuture channel;
-
-    private EventLoopGroup masterGroup;
-
-    private EventLoopGroup slaveGroup;
-
-    private final NioConnectionsCounter connectionsCounter = new NioConnectionsCounter();
-
-    private ChannelHandler serverHandler;
-
-    private boolean started;
-
-    public String getAddress() {
-        return address;
-    }
-
-    public void setAddress(String address) {
-        this.address = address;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    public void setPort(int port) {
-        this.port = port;
-    }
-
-    public ChannelHandler getServerHandler() {
-        return serverHandler;
-    }
-
-    public void setServerHandler(ChannelHandler serverHandler) {
-        this.serverHandler = serverHandler;
-    }
-
-    public int getConnections() {
-        return connectionsCounter.getConnections();
-    }
-
-    public int getMaxConnections() {
-        return connectionsCounter.getMaxConnections();
-    }
-
-    public void setMaxConnections(int maxConnections) {
-        connectionsCounter.setMaxConnections(maxConnections);
-    }
-
-    @Override
-    public boolean isStarted() {
-        return started;
-    }
-
-    @Override
-    public void start() {
-        if (started) {
-            return;
-        }
-
-        try {
-            masterGroup = new NioEventLoopGroup();
-            slaveGroup = new NioEventLoopGroup();
-            ServerBootstrap bs = new ServerBootstrap();
-            bs.option(ChannelOption.SO_BACKLOG, 1024);
-
-            bs.group(masterGroup, slaveGroup)
-                    .channel(NioServerSocketChannel.class)
-                    .childHandler(new ChannelInitializer<SocketChannel>() {
-
-                        @Override
-                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-                            LOGGER.warn("Failed to initialize a channel. Closing: " + ctx.channel(), cause);
-                            ctx.close();
-                        }
-
-                        @Override
-                        protected void initChannel(SocketChannel ch) throws Exception {
-                            ChannelPipeline p = ch.pipeline();
-                            p.addLast(connectionsCounter);
-                            p.addLast(serverHandler);
-                        }
-
-                    });
-
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Starting Netty server {}:{}.", address, port);
-            }
-
-            channel = bs.bind(address, port);
-            this.started = true;
-        } catch (Exception ex) {
-            LOGGER.error(ex.getMessage(), ex);
-            stop();
-            throw new ServiceException(ex.getMessage(), ex);
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (!started) {
-            return;
-        }
-
-        slaveGroup.shutdownGracefully();
-        masterGroup.shutdownGracefully();
-
-        try {
-            LOGGER.debug("Stopping Netty server {}:{}.", address, port);
-            channel.channel().closeFuture().sync();
-        } catch (InterruptedException ignore) {
-        }
-
-        slaveGroup = null;
-        masterGroup = null;
-        channel = null;
-
-        started = false;
-    }
-}