Mercurial > stress-tester
changeset 993:604e4ef38dee
Test NioServer
author | Devel 2 |
---|---|
date | Fri, 13 Sep 2019 12:06:13 +0200 |
parents | 2e7e654f2ad5 |
children | 594cf55e9e91 |
files | stress-tester/src/test/java/com/passus/st/utils/NioConnectionsCounter.java stress-tester/src/test/java/com/passus/st/utils/NioDecoderWrapper.java stress-tester/src/test/java/com/passus/st/utils/NioServer.java |
diffstat | 3 files changed, 252 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/utils/NioConnectionsCounter.java Fri Sep 13 12:06:13 2019 +0200 @@ -0,0 +1,62 @@ +package com.passus.st.utils; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import static io.netty.channel.ChannelHandler.Sharable; + +@Sharable +public class NioConnectionsCounter extends ChannelInboundHandlerAdapter { + + private int connections = 0; + + private int maxConnections = 100; + + public NioConnectionsCounter() { + } + + public NioConnectionsCounter(int maxConnections) { + this.maxConnections = maxConnections; + } + + public int getMaxConnections() { + return maxConnections; + } + + public int getConnections() { + synchronized (this) { + return connections; + } + } + + public void setMaxConnections(int maxConnections) { + synchronized (this) { + this.maxConnections = maxConnections; + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + boolean activate = false; + synchronized (this) { + if (connections < maxConnections) { + connections++; + activate = true; + } + } + + if (activate) { + super.channelActive(ctx); + } else { + ctx.close(); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + synchronized (this) { + connections--; + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/utils/NioDecoderWrapper.java Fri Sep 13 12:06:13 2019 +0200 @@ -0,0 +1,55 @@ +package com.passus.st.utils; + +import com.passus.data.DataDecoder; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public abstract class NioDecoderWrapper<T> extends SimpleChannelInboundHandler<ByteBuf> { + + private static final Logger LOGGER = LogManager.getLogger(NioDecoderWrapper.class); + + private final DataDecoder<T> decoder; + + public NioDecoderWrapper(DataDecoder<T> decoder) { + this.decoder = decoder; + } + + @Override + protected final void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + byte[] data; + int startOffset; + int endOffset; + int length = in.readableBytes(); + if (in.hasArray()) { + startOffset = in.arrayOffset() + in.readerIndex(); + endOffset = startOffset + length; + data = in.array(); + } else { + startOffset = 0; + endOffset = length; + data = new byte[length]; + in.getBytes(in.readerIndex(), data); + } + + int res = decoder.decode(data, startOffset, endOffset); + in.readerIndex(in.readerIndex() + res); + if (decoder.state() == DataDecoder.STATE_ERROR) { + LOGGER.error("Decoder error. " + decoder.getLastError()); + ctx.close(); + onDecoderError(decoder.getLastError(), ctx); + } else if (decoder.state() == DataDecoder.STATE_FINISHED) { + onDataDecoded(decoder.getResult(), ctx); + } + } + + protected void onDataDecoded(T data, ChannelHandlerContext ctx) { + + } + + protected void onDecoderError(String errorMsg, ChannelHandlerContext ctx) { + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/utils/NioServer.java Fri Sep 13 12:06:13 2019 +0200 @@ -0,0 +1,135 @@ +package com.passus.st.utils; + +import com.passus.commons.service.Service; +import com.passus.commons.service.ServiceException; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class NioServer implements Service { + + private static final Logger LOGGER = LogManager.getLogger(NioServer.class); + + private String address = "localhost"; + + private int port = 5000; + + private ChannelFuture channel; + + private EventLoopGroup masterGroup; + + private EventLoopGroup slaveGroup; + + private final NioConnectionsCounter connectionsCounter = new NioConnectionsCounter(); + + private ChannelHandler serverHandler; + + private boolean started; + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public int getConnections() { + return connectionsCounter.getConnections(); + } + + public int getMaxConnections() { + return connectionsCounter.getMaxConnections(); + } + + public void setMaxConnections(int maxConnections) { + connectionsCounter.setMaxConnections(maxConnections); + } + + @Override + public boolean isStarted() { + return started; + } + + @Override + public void start() { + if (started) { + return; + } + + try { + masterGroup = new NioEventLoopGroup(); + slaveGroup = new NioEventLoopGroup(); + ServerBootstrap bs = new ServerBootstrap(); + bs.option(ChannelOption.SO_BACKLOG, 1024); + + bs.group(masterGroup, slaveGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer<SocketChannel>() { + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + LOGGER.warn("Failed to initialize a channel. Closing: " + ctx.channel(), cause); + ctx.close(); + } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(connectionsCounter); + p.addLast(serverHandler); + } + + }); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Starting Netty server {}:{}.", address, port); + } + + channel = bs.bind(address, port); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Server {}:{} started.", address, port); + } + this.started = true; + } catch (Exception ex) { + LOGGER.error(ex.getMessage(), ex); + stop(); + throw new ServiceException(ex.getMessage(), ex); + } + } + + @Override + public void stop() { + if (!started) { + return; + } + + slaveGroup.shutdownGracefully(); + masterGroup.shutdownGracefully(); + + try { + LOGGER.debug("Stopping Netty server {}:{}.", address, port); + channel.channel().closeFuture().sync(); + } catch (InterruptedException ignore) { + } + + slaveGroup = null; + masterGroup = null; + channel = null; + + started = false; + } +}