Mercurial > stress-tester
changeset 560:f373af509300
reporter - transport - refactoring
author | Devel 1 |
---|---|
date | Thu, 21 Sep 2017 13:59:29 +0200 |
parents | 4b26fdc22ef2 |
children | d836ea2aefed |
files | stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/Server.java stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ServerMain.java stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/SocketReporterClient.java stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestCustom.java |
diffstat | 4 files changed, 86 insertions(+), 87 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/Server.java Thu Sep 21 13:59:29 2017 +0200 @@ -0,0 +1,82 @@ +package com.passus.st.reporter.trx; + +import com.passus.st.reporter.protocol.Reporter; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.bytes.ByteArrayDecoder; +import java.net.SocketAddress; + +/** + * + * @author mikolaj.podbielski + */ +public class Server { + public static final boolean EPOLL = false; + + private final SocketAddress serverAddress; + private final int maxFrameSize = 8192; + + private final EventLoopGroup bossGroup; + private final EventLoopGroup workerGroup; + private final Class socketChannelClass; + + private final Reporter reporter; + + public Server(SocketAddress serverAddress, Reporter reporter) { + this.serverAddress = serverAddress; + this.reporter = reporter; + + if (EPOLL) { + bossGroup = new EpollEventLoopGroup(1); + workerGroup = new EpollEventLoopGroup(); + socketChannelClass = EpollServerSocketChannel.class; + } else { + bossGroup = new NioEventLoopGroup(1); + workerGroup = new NioEventLoopGroup(); + socketChannelClass = NioServerSocketChannel.class; + } + } + + public void start() { + try { + ServerBootstrap boot = new ServerBootstrap(); + boot.option(ChannelOption.SO_BACKLOG, 100_000); + boot.option(ChannelOption.SO_REUSEADDR, true); + boot.group(bossGroup, workerGroup) + .channel(socketChannelClass) + .childHandler(new ServerInitializer()); + + Channel ch = boot.bind(serverAddress).sync().channel(); + System.err.println("Server started " + serverAddress + " epoll: " + EPOLL); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + + public void stop() { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + System.err.println("Server stopped."); + } + + public class ServerInitializer extends ChannelInitializer<SocketChannel> { + + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)); + pipeline.addLast("bytesDecoder", new ByteArrayDecoder()); + pipeline.addLast(new ReporterDispatcher(reporter)); + } + } +}
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ServerMain.java Thu Sep 21 13:49:27 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ServerMain.java Thu Sep 21 13:59:29 2017 +0200 @@ -1,25 +1,10 @@ package com.passus.st.reporter.trx; import com.passus.st.reporter.ReporterImpl; -import com.passus.st.reporter.protocol.Reporter; import com.passus.st.utils.CliUtils; import static com.passus.st.utils.CliUtils.option; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.epoll.EpollServerSocketChannel; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.handler.codec.bytes.ByteArrayDecoder; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.SocketAddress; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; @@ -31,64 +16,6 @@ public class ServerMain { public static final int PORT = 11111; - public static final boolean EPOLL = false; - - private final SocketAddress serverAddress; - private final int maxFrameSize = 8192; - - private final EventLoopGroup bossGroup; - private final EventLoopGroup workerGroup; - private final Class socketChannelClass; - - private final Reporter reporter; - - public ServerMain(SocketAddress serverAddress, Reporter reporter) { - this.serverAddress = serverAddress; - this.reporter = reporter; - - if (EPOLL) { - bossGroup = new EpollEventLoopGroup(1); - workerGroup = new EpollEventLoopGroup(); - socketChannelClass = EpollServerSocketChannel.class; - } else { - bossGroup = new NioEventLoopGroup(1); - workerGroup = new NioEventLoopGroup(); - socketChannelClass = NioServerSocketChannel.class; - } - } - - public void start() { - try { - ServerBootstrap boot = new ServerBootstrap(); - boot.option(ChannelOption.SO_BACKLOG, 100_000); - boot.option(ChannelOption.SO_REUSEADDR, true); - boot.group(bossGroup, workerGroup) - .channel(socketChannelClass) - .childHandler(new ServerInitializer()); - - Channel ch = boot.bind(serverAddress).sync().channel(); - System.err.println("Server started " + serverAddress + " epoll: " + EPOLL); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - - public void stop() { - bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); - System.err.println("Server stopped."); - } - - public class ServerInitializer extends ChannelInitializer<SocketChannel> { - - @Override - public void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)); - pipeline.addLast("bytesDecoder", new ByteArrayDecoder()); - pipeline.addLast(new ReporterDispatcher(reporter)); - } - } public static void main(String[] args) throws IOException { final Options options = new Options(); @@ -100,8 +27,7 @@ try { CommandLine cl = new DefaultParser().parse(options, args); - String[] clArgs = cl.getArgs(); - if (clArgs.length != 0) { + if (cl.getArgs().length != 0) { CliUtils.printHelp(options); return; } @@ -110,7 +36,7 @@ reporter.setVerbose(false); InetSocketAddress serverAddress = new InetSocketAddress(PORT); - ServerMain server = new ServerMain(serverAddress, reporter); + Server server = new Server(serverAddress, reporter); server.start(); Runnable shutdown = () -> {
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/SocketReporterClient.java Thu Sep 21 13:49:27 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/SocketReporterClient.java Thu Sep 21 13:59:29 2017 +0200 @@ -118,15 +118,6 @@ connect(); } if (isConnected()) { -// CharSequence result; -// if (record instanceof MetricRecord) { -// result = proxy.handleMetric((MetricRecord) record); -// } else if (record instanceof MetricsCollectionRecord) { -// result = proxy.handleMetricsCollection((MetricsCollectionRecord) record); -// } else { -// continue; -// } -// LOGGER.trace("result: {}", result); synchronized (this) { os.writeInt(bytes.length + 4); os.writeInt(code);
--- a/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestCustom.java Thu Sep 21 13:49:27 2017 +0200 +++ b/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestCustom.java Thu Sep 21 13:59:29 2017 +0200 @@ -4,7 +4,7 @@ import com.passus.st.reporter.protocol.MetricRecord; import com.passus.st.reporter.trx.ReporterDispatcher; import com.passus.st.reporter.trx.ReporterEncoder; -import com.passus.st.reporter.trx.ServerMain; +import com.passus.st.reporter.trx.Server; import com.passus.st.reporter.trx.SocketReporterClient; import java.io.DataOutputStream; import java.net.InetSocketAddress; @@ -23,7 +23,7 @@ public static void main(String[] args) throws Exception { ReporterImpl reporter = new ReporterImpl(); reporter.setVerbose(true); - ServerMain server = new ServerMain(new InetSocketAddress(ServerMain.PORT), reporter); + Server server = new Server(new InetSocketAddress(ServerMain.PORT), reporter); server.start(); System.out.println("server started");