changeset 993:604e4ef38dee

Test NioServer
author Devel 2
date Fri, 13 Sep 2019 12:06:13 +0200
parents 2e7e654f2ad5
children 594cf55e9e91
files stress-tester/src/test/java/com/passus/st/utils/NioConnectionsCounter.java stress-tester/src/test/java/com/passus/st/utils/NioDecoderWrapper.java stress-tester/src/test/java/com/passus/st/utils/NioServer.java
diffstat 3 files changed, 252 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/utils/NioConnectionsCounter.java	Fri Sep 13 12:06:13 2019 +0200
@@ -0,0 +1,62 @@
+package com.passus.st.utils;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+import static io.netty.channel.ChannelHandler.Sharable;
+
+@Sharable
+public class NioConnectionsCounter extends ChannelInboundHandlerAdapter {
+
+    private int connections = 0;
+
+    private int maxConnections = 100;
+
+    public NioConnectionsCounter() {
+    }
+
+    public NioConnectionsCounter(int maxConnections) {
+        this.maxConnections = maxConnections;
+    }
+
+    public int getMaxConnections() {
+        return maxConnections;
+    }
+
+    public int getConnections() {
+        synchronized (this) {
+            return connections;
+        }
+    }
+
+    public void setMaxConnections(int maxConnections) {
+        synchronized (this) {
+            this.maxConnections = maxConnections;
+        }
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        boolean activate = false;
+        synchronized (this) {
+            if (connections < maxConnections) {
+                connections++;
+                activate = true;
+            }
+        }
+
+        if (activate) {
+            super.channelActive(ctx);
+        } else {
+            ctx.close();
+        }
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        super.channelInactive(ctx);
+        synchronized (this) {
+            connections--;
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/utils/NioDecoderWrapper.java	Fri Sep 13 12:06:13 2019 +0200
@@ -0,0 +1,55 @@
+package com.passus.st.utils;
+
+import com.passus.data.DataDecoder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public abstract class NioDecoderWrapper<T> extends SimpleChannelInboundHandler<ByteBuf> {
+
+    private static final Logger LOGGER = LogManager.getLogger(NioDecoderWrapper.class);
+
+    private final DataDecoder<T> decoder;
+
+    public NioDecoderWrapper(DataDecoder<T> decoder) {
+        this.decoder = decoder;
+    }
+
+    @Override
+    protected final void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
+        byte[] data;
+        int startOffset;
+        int endOffset;
+        int length = in.readableBytes();
+        if (in.hasArray()) {
+            startOffset = in.arrayOffset() + in.readerIndex();
+            endOffset = startOffset + length;
+            data = in.array();
+        } else {
+            startOffset = 0;
+            endOffset = length;
+            data = new byte[length];
+            in.getBytes(in.readerIndex(), data);
+        }
+
+        int res = decoder.decode(data, startOffset, endOffset);
+        in.readerIndex(in.readerIndex() + res);
+        if (decoder.state() == DataDecoder.STATE_ERROR) {
+            LOGGER.error("Decoder error. " + decoder.getLastError());
+            ctx.close();
+            onDecoderError(decoder.getLastError(), ctx);
+        } else if (decoder.state() == DataDecoder.STATE_FINISHED) {
+            onDataDecoded(decoder.getResult(), ctx);
+        }
+    }
+
+    protected void onDataDecoded(T data, ChannelHandlerContext ctx) {
+
+    }
+
+    protected void onDecoderError(String errorMsg, ChannelHandlerContext ctx) {
+
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/utils/NioServer.java	Fri Sep 13 12:06:13 2019 +0200
@@ -0,0 +1,135 @@
+package com.passus.st.utils;
+
+import com.passus.commons.service.Service;
+import com.passus.commons.service.ServiceException;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class NioServer implements Service {
+
+    private static final Logger LOGGER = LogManager.getLogger(NioServer.class);
+
+    private String address = "localhost";
+
+    private int port = 5000;
+
+    private ChannelFuture channel;
+
+    private EventLoopGroup masterGroup;
+
+    private EventLoopGroup slaveGroup;
+
+    private final NioConnectionsCounter connectionsCounter = new NioConnectionsCounter();
+
+    private ChannelHandler serverHandler;
+
+    private boolean started;
+
+    public String getAddress() {
+        return address;
+    }
+
+    public void setAddress(String address) {
+        this.address = address;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public int getConnections() {
+        return connectionsCounter.getConnections();
+    }
+
+    public int getMaxConnections() {
+        return connectionsCounter.getMaxConnections();
+    }
+
+    public void setMaxConnections(int maxConnections) {
+        connectionsCounter.setMaxConnections(maxConnections);
+    }
+
+    @Override
+    public boolean isStarted() {
+        return started;
+    }
+
+    @Override
+    public void start() {
+        if (started) {
+            return;
+        }
+
+        try {
+            masterGroup = new NioEventLoopGroup();
+            slaveGroup = new NioEventLoopGroup();
+            ServerBootstrap bs = new ServerBootstrap();
+            bs.option(ChannelOption.SO_BACKLOG, 1024);
+
+            bs.group(masterGroup, slaveGroup)
+                    .channel(NioServerSocketChannel.class)
+                    .childHandler(new ChannelInitializer<SocketChannel>() {
+
+                        @Override
+                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+                            LOGGER.warn("Failed to initialize a channel. Closing: " + ctx.channel(), cause);
+                            ctx.close();
+                        }
+
+                        @Override
+                        protected void initChannel(SocketChannel ch) throws Exception {
+                            ChannelPipeline p = ch.pipeline();
+                            p.addLast(connectionsCounter);
+                            p.addLast(serverHandler);
+                        }
+
+                    });
+
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Starting Netty server {}:{}.", address, port);
+            }
+
+            channel = bs.bind(address, port);
+
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Server {}:{} started.", address, port);
+            }
+            this.started = true;
+        } catch (Exception ex) {
+            LOGGER.error(ex.getMessage(), ex);
+            stop();
+            throw new ServiceException(ex.getMessage(), ex);
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (!started) {
+            return;
+        }
+
+        slaveGroup.shutdownGracefully();
+        masterGroup.shutdownGracefully();
+
+        try {
+            LOGGER.debug("Stopping Netty server {}:{}.", address, port);
+            channel.channel().closeFuture().sync();
+        } catch (InterruptedException ignore) {
+        }
+
+        slaveGroup = null;
+        masterGroup = null;
+        channel = null;
+
+        started = false;
+    }
+}