Mercurial > stress-tester
changeset 559:4b26fdc22ef2
reporter - transport - refactoring, configurable threads
line wrap: on
line diff
--- a/stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroRpcBenchmark.java Thu Sep 21 11:56:09 2017 +0200 +++ b/stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroRpcBenchmark.java Thu Sep 21 13:49:27 2017 +0200 @@ -1,7 +1,5 @@ package com.passus.st.avro; -import com.passus.st.reporter.protocol.MetricRecord; -import com.passus.st.reporter.protocol.MetricsCollectionRecord; import com.passus.st.reporter.protocol.Reporter; import com.passus.utils.AllocationUtils; import java.io.IOException;
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/ReporterClient.java Thu Sep 21 11:56:09 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/ReporterClient.java Thu Sep 21 13:49:27 2017 +0200 @@ -1,14 +1,62 @@ package com.passus.st.reporter; import com.passus.commons.service.Service; +import java.net.InetSocketAddress; +import java.util.concurrent.BlockingQueue; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * * @author mikolaj.podbielski */ -public interface ReporterClient extends Service { +public abstract class ReporterClient implements Service { - public boolean send(Object object); + protected final Logger logger = LogManager.getLogger(getClass()); + protected final MetricConverter converter = new MetricConverter(); + protected final InetSocketAddress serverAddress; + protected final BlockingQueue<Object> queue; + protected final boolean dropIfQueueFull; - void waitForEmptyQueue() throws InterruptedException; + protected volatile int droppedMessages; + protected volatile boolean working; + + public ReporterClient(InetSocketAddress serverAddress, BlockingQueue<Object> queue, boolean dropIfQueueFull) { + this.serverAddress = serverAddress; + this.queue = queue; + this.dropIfQueueFull = dropIfQueueFull; + } + + public boolean send(Object object) { + if (dropIfQueueFull) { + if (!queue.offer(object)) { + ++droppedMessages; + logger.debug("Could not enqueue message."); + return false; + } + } else { + try { + queue.put(object); + } catch (InterruptedException ignore) { + ++droppedMessages; + return false; + } + } + return true; + } + + @Override + public boolean isStarted() { + return working; + } + + public void waitForEmptyQueue() throws InterruptedException { + while (queue.isEmpty() == false) { + Thread.sleep(100); + } + } + + public int getDroppedMessages() { + return droppedMessages; + } }
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/SnmpLogger.java Thu Sep 21 11:56:09 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/SnmpLogger.java Thu Sep 21 13:49:27 2017 +0200 @@ -54,6 +54,7 @@ * * @param add must be quasi url, (ex. udp:172.16.60.101/161) * @param comm name of the SNMP community + * @param period */ public SnmpLogger(final String add, final String comm, final int period) { @@ -71,18 +72,19 @@ } + @Override public void run(){ try { builder = new StringBuilder(200); builder.append(System.currentTimeMillis()); builder.append(";"); - String cpuLoad1m = this.getAsString(oid_cpuLoad1m); - String cpuLoad5m = this.getAsString(oid_cpuLoad5m); - String cpuLoad15m = this.getAsString(oid_cpuLoad15m); - String ramFree = this.getAsString(oid_ramFree); - String swapFree = this.getAsString(oid_swapFree); - String ramTotal = this.getAsString(oid_ramTotal); - String swapTotal = this.getAsString(oid_swapTotal); + String cpuLoad1m = getAsString(oid_cpuLoad1m); + String cpuLoad5m = getAsString(oid_cpuLoad5m); + String cpuLoad15m = getAsString(oid_cpuLoad15m); + String ramFree = getAsString(oid_ramFree); + String swapFree = getAsString(oid_swapFree); + String ramTotal = getAsString(oid_ramTotal); + String swapTotal = getAsString(oid_swapTotal); builder.append(cpuLoad1m); builder.append(";"); builder.append(cpuLoad5m); @@ -133,7 +135,7 @@ * @throws IOException */ public String getAsString(OID oid) throws IOException { - ResponseEvent event = get(new OID[]{oid}); + ResponseEvent event = get(oid); return event.getResponse().get(0).getVariable().toString(); } @@ -144,7 +146,7 @@ * @return * @throws IOException */ - public ResponseEvent get(OID oids[]) throws IOException { + public ResponseEvent get(OID... oids) throws IOException { PDU pdu = new PDU(); for (OID oid : oids) { pdu.add(new VariableBinding(oid));
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/AvroRpcReporterClient.java Thu Sep 21 11:56:09 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/AvroRpcReporterClient.java Thu Sep 21 13:49:27 2017 +0200 @@ -3,7 +3,6 @@ import com.passus.commons.metric.Metric; import com.passus.commons.metric.MetricsCollection; import com.passus.commons.utils.SimpleThreadFactory; -import com.passus.st.reporter.MetricConverter; import com.passus.st.reporter.ReporterClient; import com.passus.st.reporter.protocol.MetricRecord; import com.passus.st.reporter.protocol.MetricsCollectionRecord; @@ -11,7 +10,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -19,56 +17,31 @@ import org.apache.avro.AvroRemoteException; import org.apache.avro.ipc.NettyTransceiver; import org.apache.avro.ipc.specific.SpecificRequestor; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; /** * * @author mikolaj.podbielski */ -public class AvroRpcReporterClient implements ReporterClient { - - private static final Logger LOGGER = LogManager.getLogger(AvroRpcReporterClient.class); +public class AvroRpcReporterClient extends ReporterClient { - private final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(4096); - private final MetricConverter converter = new MetricConverter(); - private final InetSocketAddress serverAddress; - private volatile boolean working; private SenderThread sender; public AvroRpcReporterClient(InetSocketAddress serverAddress) { - this.serverAddress = serverAddress; - } - - @Override - public boolean send(Object object) { - if (!queue.offer(object)) { - System.out.println("reporter queue full"); - LOGGER.debug("Could not enqueue message."); - return false; - } - return true; - } - - @Override - public boolean isStarted() { - return working; + super(serverAddress, new ArrayBlockingQueue<>(4096), true); } @Override public void start() { - if (working) { - return; + if (!working) { + working = true; + sender = new SenderThread(); + try { + sender.connect(); + } catch (IOException ignore) { + } + sender.start(); } - - working = true; - sender = new SenderThread(); - try { - sender.connect(); - } catch (IOException ignore) { - } - sender.start(); } @Override @@ -84,13 +57,6 @@ } } - @Override - public void waitForEmptyQueue() throws InterruptedException { - while (queue.isEmpty() == false) { - Thread.sleep(100); - } - } - private static Executor pool(String name, int corePoolSize) { SimpleThreadFactory factory = new SimpleThreadFactory("Avro NettyTransceiver" + name); return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, @@ -163,22 +129,22 @@ System.out.println(metric.getClass()); continue; } - LOGGER.trace("result: {}", result); + logger.trace("result: {}", result); } } catch (InterruptedException ex) { - LOGGER.trace("Queue.take() was interrupted."); + logger.trace("Queue.take() was interrupted."); } catch (AvroRemoteException ex) { - LOGGER.warn("Could not send.", ex); + logger.warn("Could not send.", ex); } catch (IOException ex) { - LOGGER.warn("Could not connect.", ex); + logger.warn("Could not connect.", ex); } catch (Exception ex) { - LOGGER.error(ex); + logger.error(ex); } } disconnect(); - LOGGER.debug("Sender stopped."); + logger.debug("Sender stopped."); } }
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ServerMain.java Thu Sep 21 11:56:09 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ServerMain.java Thu Sep 21 13:49:27 2017 +0200 @@ -3,17 +3,17 @@ import com.passus.st.reporter.ReporterImpl; import com.passus.st.reporter.SnmpLogger; import com.passus.st.reporter.protocol.Reporter; +import com.passus.st.utils.CliUtils; +import static com.passus.st.utils.CliUtils.option; import java.io.IOException; import java.net.InetSocketAddress; import org.apache.avro.ipc.NettyServer; import org.apache.avro.ipc.Responder; import org.apache.avro.ipc.Server; import org.apache.avro.ipc.specific.SpecificResponder; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; -import static com.passus.st.utils.CliUtils.option; -import org.apache.commons.cli.DefaultParser; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.ParseException; /** @@ -24,18 +24,21 @@ static final int PORT = 11111; - static void printHelp(Options options) { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("[options]", "description", options, ""); - } - - static void printError(String msg) { - System.err.print(msg); - System.exit(1); + static SnmpLogger create(CommandLine cl) throws ParseException { + if (cl.hasOption("s")) { + // SNMP listener is enabled + String snmpAddr = cl.getOptionValue("s"); + String snmpCommunity = cl.hasOption("co") ? cl.getOptionValue("co") : "passus"; + int snmpPeriod = cl.hasOption("p") ? Integer.parseUnsignedInt(cl.getOptionValue("p")) : 5; + return new SnmpLogger(snmpAddr, snmpCommunity, snmpPeriod); + } else if (cl.hasOption("co") || cl.hasOption("p")) { + // Community option without SNMP option specified + throw new ParseException("Options <snmpCommunity> and <snmpPeriod> require to specify SNMP Address."); + } + return null; } public static void main(String[] args) throws IOException { - final Options options = new Options(); options.addOption(option("s", "snmp").desc("Collect SNMP metrics.") @@ -60,24 +63,11 @@ SnmpLogger snmp; try { CommandLine cl = new DefaultParser().parse(options, args); - if (cl.hasOption("s")) { - // SNMP listener is enabled - String snmpAddr = cl.getOptionValue("s"); - String snmpCommunity = cl.hasOption("co") ? cl.getOptionValue("co") : "passus"; - int snmpPeriod = cl.hasOption("p") ? Integer.parseUnsignedInt(cl.getOptionValue("p")) : 5; - snmp = new SnmpLogger(snmpAddr, snmpCommunity, snmpPeriod); - } else if (cl.hasOption("co") || cl.hasOption("p")) { - // Community option without SNMP option specified - printError("Options <snmpCommunity> and <snmpPeriod> require to specify SNMP Address."); - return; - } else { - snmp = null; - } - + snmp = create(cl); merge = cl.hasOption("m"); } catch (ParseException ex) { System.out.println(ex.getMessage()); - printHelp(options); + CliUtils.printHelp(options); return; }
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterDispatcher.java Thu Sep 21 11:56:09 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterDispatcher.java Thu Sep 21 13:49:27 2017 +0200 @@ -3,6 +3,8 @@ import com.passus.st.reporter.protocol.MetricRecord; import com.passus.st.reporter.protocol.MetricsCollectionRecord; import com.passus.st.reporter.protocol.Reporter; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; import java.io.ByteArrayInputStream; import java.io.IOException; import org.apache.avro.AvroRemoteException; @@ -15,7 +17,7 @@ * * @author mikolaj.podbielski */ -public class ReporterDispatcher { +public class ReporterDispatcher extends SimpleChannelInboundHandler<byte[]> { public static final int TYPE_METRIC = 1; public static final int TYPE_METRICS_COLLECTION = 2; @@ -33,9 +35,10 @@ this.reporter = reporter; } - public void dispatch(byte[] frame) throws AvroRemoteException, IOException { - int code = getInt4(frame, 0); - ByteArrayInputStream bais = new ByteArrayInputStream(frame, 4, frame.length - 4); + @Override + public void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws AvroRemoteException, IOException { + int code = getInt4(msg, 0); + ByteArrayInputStream bais = new ByteArrayInputStream(msg, 4, msg.length - 4); BinaryDecoder decoder = decoderFactory.binaryDecoder(bais, null); switch (code) {
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ServerMain.java Thu Sep 21 11:56:09 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ServerMain.java Thu Sep 21 13:49:27 2017 +0200 @@ -1,17 +1,15 @@ package com.passus.st.reporter.trx; +import com.passus.st.reporter.ReporterImpl; import com.passus.st.reporter.protocol.Reporter; -import com.passus.st.reporter.ReporterImpl; import com.passus.st.utils.CliUtils; import static com.passus.st.utils.CliUtils.option; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; -import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; @@ -20,10 +18,11 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.bytes.ByteArrayDecoder; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; -import static com.passus.st.utils.CliUtils.option; /** * @@ -34,9 +33,7 @@ public static final int PORT = 11111; public static final boolean EPOLL = false; - private final String bindHost; - private final int port; - private final int numThreads; + private final SocketAddress serverAddress; private final int maxFrameSize = 8192; private final EventLoopGroup bossGroup; @@ -45,18 +42,16 @@ private final Reporter reporter; - public ServerMain(String bindHost, int port, int numThreads, Reporter reporter) { - this.bindHost = bindHost; - this.port = port; - this.numThreads = numThreads; + public ServerMain(SocketAddress serverAddress, Reporter reporter) { + this.serverAddress = serverAddress; this.reporter = reporter; if (EPOLL) { - bossGroup = new EpollEventLoopGroup(numThreads); + bossGroup = new EpollEventLoopGroup(1); workerGroup = new EpollEventLoopGroup(); socketChannelClass = EpollServerSocketChannel.class; } else { - bossGroup = new NioEventLoopGroup(numThreads); + bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); socketChannelClass = NioServerSocketChannel.class; } @@ -69,10 +64,10 @@ boot.option(ChannelOption.SO_REUSEADDR, true); boot.group(bossGroup, workerGroup) .channel(socketChannelClass) - .childHandler(new TestServerInitializer()); + .childHandler(new ServerInitializer()); - Channel ch = boot.bind(bindHost, port).sync().channel(); - System.err.println("Server started " + bindHost + ":" + port + " epoll: " + EPOLL); + Channel ch = boot.bind(serverAddress).sync().channel(); + System.err.println("Server started " + serverAddress + " epoll: " + EPOLL); } catch (Exception ex) { ex.printStackTrace(); } @@ -84,36 +79,19 @@ System.err.println("Server stopped."); } - public class TestServerInitializer extends ChannelInitializer<SocketChannel> { + public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)); pipeline.addLast("bytesDecoder", new ByteArrayDecoder()); - pipeline.addLast(new ReporterChannelInboundHandler(reporter)); - } - } - - public static class ReporterChannelInboundHandler extends SimpleChannelInboundHandler<byte[]> { - - private final ReporterDispatcher dispatcher; - - public ReporterChannelInboundHandler(Reporter reporter) { - dispatcher = new ReporterDispatcher(reporter); - } - - @Override - public void channelRead0(ChannelHandlerContext ctx, byte[] data) throws Exception { - dispatcher.dispatch(data); + pipeline.addLast(new ReporterDispatcher(reporter)); } } public static void main(String[] args) throws IOException { - final String syntax = "server <bind address> [-t threads=500]"; final Options options = new Options(); - options.addOption(option("t", "threads", "number of worker threads", "threads")); options.addOption(option("m", "merged").desc("Write file with merged request and response data") .hasArg(false) @@ -123,17 +101,16 @@ try { CommandLine cl = new DefaultParser().parse(options, args); String[] clArgs = cl.getArgs(); - if (clArgs.length != 1) { - CliUtils.printHelp(options, syntax); + if (clArgs.length != 0) { + CliUtils.printHelp(options); return; } - int threads = Integer.parseInt(cl.getOptionValue('t', "500")); - ReporterImpl reporter = new ReporterImpl(cl.hasOption("m")); reporter.setVerbose(false); - ServerMain server = new ServerMain(clArgs[0], PORT, threads, reporter); + InetSocketAddress serverAddress = new InetSocketAddress(PORT); + ServerMain server = new ServerMain(serverAddress, reporter); server.start(); Runnable shutdown = () -> { @@ -142,7 +119,7 @@ }; Runtime.getRuntime().addShutdownHook(new Thread(shutdown)); } catch (org.apache.commons.cli.ParseException ex) { - CliUtils.printHelp(options, syntax); + CliUtils.printHelp(options); } } }
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/SocketReporterClient.java Thu Sep 21 11:56:09 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/SocketReporterClient.java Thu Sep 21 13:49:27 2017 +0200 @@ -2,7 +2,6 @@ import com.passus.commons.metric.Metric; import com.passus.commons.metric.MetricsCollection; -import com.passus.st.reporter.MetricConverter; import com.passus.st.reporter.ReporterClient; import com.passus.st.reporter.protocol.MetricRecord; import com.passus.st.reporter.protocol.MetricsCollectionRecord; @@ -13,76 +12,46 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import org.apache.avro.AvroRemoteException; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; /** * * @author mikolaj.podbielski */ -public class SocketReporterClient implements ReporterClient { - - private static final Logger LOGGER = LogManager.getLogger(SocketReporterClient.class); +public class SocketReporterClient extends ReporterClient { - private final MetricConverter converter = new MetricConverter(); - private final InetSocketAddress serverAddress; - private final BlockingQueue<Object> queue; - private final boolean dropIfQueueFull; - private volatile boolean working; -// private SenderThread sender; - private SenderThread[] senders = new SenderThread[4]; + private final SenderThread[] senders; - public SocketReporterClient(InetSocketAddress serverAddress) { - this.serverAddress = serverAddress; - this.queue = new ArrayBlockingQueue<>(4096); - this.dropIfQueueFull = true; + public SocketReporterClient(InetSocketAddress serverAddress, int threads) { + this(serverAddress, threads, new ArrayBlockingQueue<>(4096), true); } - public SocketReporterClient(InetSocketAddress serverAddress, BlockingQueue<Object> queue, boolean dropIfQueueFull) { - this.serverAddress = serverAddress; - this.queue = queue; - this.dropIfQueueFull = dropIfQueueFull; - } - - @Override - public boolean isStarted() { - return working; + public SocketReporterClient(InetSocketAddress serverAddress, int threads, BlockingQueue<Object> queue, boolean dropIfQueueFull) { + super(serverAddress, queue, dropIfQueueFull); + if (threads < 1 || threads > 8) { + throw new IllegalArgumentException("Allowed between 1 and 8 reporter threads."); + } + senders = new SenderThread[threads]; } @Override public void start() { - if (working) { - return; + if (!working) { + working = true; + for (int i = 0; i < senders.length; i++) { + senders[i] = new SenderThread(); + try { + senders[i].connect(); + } catch (IOException ignore) { + } + senders[i].start(); + } } - - working = true; -// sender = new SenderThread(); -// try { -// sender.connect(); -// } catch (IOException ignore) { -// } -// sender.start(); - for (int i = 0; i < senders.length; i++) { - senders[i] = new SenderThread(); - try { - senders[i].connect(); - } catch (IOException ignore) { - } - senders[i].start(); - } - } @Override public void stop() { if (working) { working = false; -// sender.interrupt(); -// try { -// sender.join(500); -// } catch (Exception ex) { -// } -// sender = null; for (int i = 0; i < senders.length; i++) { senders[i].interrupt(); try { @@ -94,31 +63,6 @@ } } - @Override - public void waitForEmptyQueue() throws InterruptedException { - while (queue.isEmpty() == false) { - Thread.sleep(100); - } - } - - @Override - public boolean send(Object object) { - if (dropIfQueueFull) { - if (!queue.offer(object)) { - System.out.println("reporter queue full"); - LOGGER.debug("Could not enqueue message."); - return false; - } - } else { - try { - queue.put(object); - } catch (InterruptedException ignore) { - return false; - } - } - return true; - } - private class SenderThread extends Thread { private final ReporterEncoder encoder = new ReporterEncoder(); @@ -130,7 +74,7 @@ socket = new Socket(serverAddress.getAddress(), serverAddress.getPort()); os = new DataOutputStream(socket.getOutputStream()); } catch (IOException ex) { - LOGGER.warn("Could not connect.", ex); + logger.warn("Could not connect.", ex); } } @@ -190,19 +134,19 @@ } } } catch (InterruptedException ex) { - LOGGER.trace("Queue.take() was interrupted."); + logger.trace("Queue.take() was interrupted."); } catch (AvroRemoteException ex) { - LOGGER.warn("Could not send.", ex); + logger.warn("Could not send.", ex); } catch (IOException ex) { - LOGGER.warn("Could not connect.", ex); + logger.warn("Could not connect.", ex); } catch (Exception ex) { - LOGGER.error(ex); + logger.error(ex); } } disconnect(); - LOGGER.debug("Sender stopped."); + logger.debug("Sender stopped."); } }
--- a/stress-tester-reporter/src/main/java/com/passus/st/utils/CliUtils.java Thu Sep 21 11:56:09 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/utils/CliUtils.java Thu Sep 21 13:49:27 2017 +0200 @@ -32,4 +32,9 @@ HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(syntax, "---", options, "==="); } + + public static void printHelp(Options options) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("[options]", "description", options, ""); + } }
--- a/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestCustom.java Thu Sep 21 11:56:09 2017 +0200 +++ b/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestCustom.java Thu Sep 21 13:49:27 2017 +0200 @@ -7,6 +7,7 @@ import com.passus.st.reporter.trx.ServerMain; import com.passus.st.reporter.trx.SocketReporterClient; import java.io.DataOutputStream; +import java.net.InetSocketAddress; import java.net.Socket; import java.util.HashMap; import java.util.Map; @@ -22,7 +23,7 @@ public static void main(String[] args) throws Exception { ReporterImpl reporter = new ReporterImpl(); reporter.setVerbose(true); - ServerMain server = new ServerMain("localhost", ServerMain.PORT, 100, reporter); + ServerMain server = new ServerMain(new InetSocketAddress(ServerMain.PORT), reporter); server.start(); System.out.println("server started"); @@ -57,7 +58,7 @@ } static void client() throws Exception { - SocketReporterClient rc = new SocketReporterClient(Test.ADDRESS); + SocketReporterClient rc = new SocketReporterClient(Test.ADDRESS, 1); rc.start(); rc.send(METRIC); rc.waitForEmptyQueue();
--- a/stress-tester/src/main/java/com/passus/st/Main.java Thu Sep 21 11:56:09 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/Main.java Thu Sep 21 13:49:27 2017 +0200 @@ -157,10 +157,13 @@ .hasArg().argName("ip") .build() ); - options.addOption(option("nrp", "newReporterProto").desc("Enables new reporter protocol.") .hasArg(false) .build()); + options.addOption(option("nrt", "newReporterThreads").desc("Number of sending threads. (range 1 - 8, default 2)") + .hasArg().argName("threads") + .build() + ); options.addOption(option("wf", "writeFile").desc("Write result to file.") .hasArg().argName("file").optionalArg(true) @@ -310,7 +313,8 @@ InetAddress addr = InetAddress.getByName(cl.getOptionValue("ri")); InetSocketAddress socketAddr = new InetSocketAddress(addr, port); if (cl.hasOption("nrp")) { - reporterClient = new SocketReporterClient(socketAddr); + int threads = Integer.parseInt(cl.getOptionValue("nrt", "2")); + reporterClient = new SocketReporterClient(socketAddr, threads); } else { reporterClient = new AvroRpcReporterClient(socketAddr); } @@ -418,6 +422,7 @@ if (reporterClient != null) { reporterClient.waitForEmptyQueue(); reporterClient.stop(); + System.out.println("Dropped reporter messages: " + reporterClient.getDroppedMessages()); } if (summaryListener != null) { summaryListener.close();
--- a/stress-tester/src/main/java/com/passus/st/PcapReporter.java Thu Sep 21 11:56:09 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/PcapReporter.java Thu Sep 21 13:49:27 2017 +0200 @@ -43,8 +43,6 @@ import org.apache.commons.cli.ParseException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import static com.passus.st.Main.printMetrics; -import static com.passus.st.utils.CliUtils.option; /** * @@ -93,7 +91,7 @@ Log4jConfigurationFactory.enableFactory(logLevel); InetAddress addr = InetAddress.getByName(cl.getOptionValue("ri")); - SocketReporterClient reporterClient = new SocketReporterClient(new InetSocketAddress(addr, 11111)); + SocketReporterClient reporterClient = new SocketReporterClient(new InetSocketAddress(addr, 11111), 2); reporterClient.start(); LocalHandler lh = new LocalHandler(reporterClient, cl.hasOption("ps"));