Mercurial > stress-tester
changeset 557:f65e374eb0f8
reporter - transport
line wrap: on
line diff
--- a/stress-tester-benchmark/src/main/java/com/passus/st/avro/AbstractAvroBenchmark.java Wed Sep 20 13:29:12 2017 +0200 +++ b/stress-tester-benchmark/src/main/java/com/passus/st/avro/AbstractAvroBenchmark.java Wed Sep 20 23:11:11 2017 +0200 @@ -3,8 +3,11 @@ import static com.passus.commons.collection.FluentBuilder.e; import static com.passus.commons.collection.FluentBuilder.map; import com.passus.st.reporter.protocol.MetricRecord; +import com.passus.st.reporter.protocol.MetricsCollectionRecord; +import com.passus.st.reporter.protocol.Reporter; import java.net.InetSocketAddress; import java.util.Map; +import org.apache.avro.AvroRemoteException; /** * @@ -12,7 +15,7 @@ */ public class AbstractAvroBenchmark { - protected final InetSocketAddress serverAddress = new InetSocketAddress(11111); + protected final InetSocketAddress serverAddress = new InetSocketAddress("localhost", 11111); protected final MetricRecord bigMetricRecord; protected final MetricRecord smallMetricRecord; @@ -25,4 +28,18 @@ bigMetricRecord = new MetricRecord("big", map); smallMetricRecord = new MetricRecord("small", nested); } + + protected static class DummyReporter implements Reporter { + + @Override + public CharSequence handleMetric(MetricRecord metric) throws AvroRemoteException { + return "OK"; + } + + @Override + public CharSequence handleMetricsCollection(MetricsCollectionRecord collection) throws AvroRemoteException { + return "OK"; + } + + } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroCustomBenchmark.java Wed Sep 20 23:11:11 2017 +0200 @@ -0,0 +1,74 @@ +package com.passus.st.avro; + +import com.passus.st.reporter.trx.ServerMain; +import com.passus.st.reporter.trx.ReporterClient; +import com.passus.st.reporter.protocol.Reporter; +import com.passus.utils.AllocationUtils; +import java.io.IOException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.avro.AvroRemoteException; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +/** + * + * @author mikolaj.podbielski + */ +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@Fork(value = 1) +@Measurement(iterations = 7) +@Warmup(iterations = 7) +public class AvroCustomBenchmark extends AbstractAvroBenchmark { + + private final ServerMain server = new ServerMain("localhost", 11111, 500, new DummyReporter()); + private final ReporterClient client = new ReporterClient(serverAddress, new SynchronousQueue<>(), false); + private final TestMetric metric = new TestMetric(); + + @Setup + public void setup() throws IOException { + server.start(); + client.start(); + } + + @TearDown + public void tearDown() { + server.stop(); + client.stop(); + } + + @Benchmark + public Object metric() throws AvroRemoteException { + boolean send = client.send(metric); + if (send == false) { + throw new IllegalStateException(); + } + return send; + } + + public static void main(String[] args) throws Exception { + Options opt = new OptionsBuilder().include(AvroCustomBenchmark.class.getSimpleName() + ".*").build(); + new Runner(opt).run(); + + AllocationUtils au = new AllocationUtils(); + AvroCustomBenchmark benchmark = new AvroCustomBenchmark(); + benchmark.setup(); + au.checkAllocation("metric", benchmark::metric); + benchmark.client.send(new TestMetric()); + benchmark.tearDown(); + } +}
--- a/stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroRpcBenchmark.java Wed Sep 20 13:29:12 2017 +0200 +++ b/stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroRpcBenchmark.java Wed Sep 20 23:11:11 2017 +0200 @@ -81,17 +81,4 @@ benchmark.tearDown(); } - private static class DummyReporter implements Reporter { - - @Override - public CharSequence handleMetric(MetricRecord metric) throws AvroRemoteException { - return "OK"; - } - - @Override - public CharSequence handleMetricsCollection(MetricsCollectionRecord collection) throws AvroRemoteException { - return "OK"; - } - - } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester-benchmark/src/main/java/com/passus/st/avro/TestMetric.java Wed Sep 20 23:11:11 2017 +0200 @@ -0,0 +1,78 @@ +package com.passus.st.avro; + +import com.passus.commons.metric.Metric; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * + * @author mikolaj.podbielski + */ +public class TestMetric implements Metric { + + private final HashMap<String, Serializable> attributes = new HashMap<>(); + + { + attributes.put("String", "value"); + attributes.put("Long", 12345L); + attributes.put("Integer", 123); + attributes.put("Float", 123.45f); + } + + @Override + public String getName() { + return "test"; + } + + @Override + public boolean isActive() { + return true; + } + + @Override + public void activate() { + } + + @Override + public void deactivate() { + } + + @Override + public Class getAttributeClass(String name) { + Serializable value = attributes.get(name); + return value.getClass(); + } + + @Override + public boolean hasAttribute(String name) { + return attributes.containsKey(name); + } + + @Override + public Set<String> getAttributesName() { + return attributes.keySet(); + } + + @Override + public Serializable getAttributeValue(String name) { + return attributes.get(name); + } + + @Override + public Map<String, Serializable> getAttributesValue() { + return attributes; + } + + @Override + public void update(Metric metric) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public void reset() { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + +}
--- a/stress-tester-reporter/pom.xml Wed Sep 20 13:29:12 2017 +0200 +++ b/stress-tester-reporter/pom.xml Wed Sep 20 23:11:11 2017 +0200 @@ -79,6 +79,12 @@ <artifactId>commons-cli</artifactId> <version>1.3.1</version> </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + <version>4.1.6.Final</version> + <type>jar</type> + </dependency> </dependencies> <build>
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ServerMain.java Wed Sep 20 13:29:12 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ServerMain.java Wed Sep 20 23:11:11 2017 +0200 @@ -22,7 +22,6 @@ public class ServerMain { static final int PORT = 11111; - static SnmpLogger snmp; static void printHelp(Options options) { HelpFormatter formatter = new HelpFormatter(); @@ -57,6 +56,7 @@ ); boolean merge; + SnmpLogger snmp; try { CommandLine cl = new DefaultParser().parse(options, args); if (cl.hasOption("s")) { @@ -69,6 +69,8 @@ // Community option without SNMP option specified printError("Options <snmpCommunity> and <snmpPeriod> require to specify SNMP Address."); return; + } else { + snmp = null; } merge = cl.hasOption("m");
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterClient.java Wed Sep 20 23:11:11 2017 +0200 @@ -0,0 +1,205 @@ +package com.passus.st.reporter.trx; + +import com.passus.commons.metric.Metric; +import com.passus.commons.metric.MetricsCollection; +import com.passus.commons.service.Service; +import com.passus.st.reporter.MetricConverter; +import com.passus.st.reporter.protocol.MetricRecord; +import com.passus.st.reporter.protocol.MetricsCollectionRecord; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.apache.avro.AvroRemoteException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * + * @author mikolaj.podbielski + */ +public class ReporterClient implements Service { + + private static final Logger LOGGER = LogManager.getLogger(ReporterClient.class); + + private final MetricConverter converter = new MetricConverter(); + private final InetSocketAddress serverAddress; + private final BlockingQueue<Object> queue; + private final boolean dropIfQueueFull; + private volatile boolean working; +// private SenderThread sender; + private SenderThread[] senders = new SenderThread[4]; + + public ReporterClient(InetSocketAddress serverAddress) { + this.serverAddress = serverAddress; + this.queue = new ArrayBlockingQueue<>(4096); + this.dropIfQueueFull = true; + } + + public ReporterClient(InetSocketAddress serverAddress, BlockingQueue<Object> queue, boolean dropIfQueueFull) { + this.serverAddress = serverAddress; + this.queue = queue; + this.dropIfQueueFull = dropIfQueueFull; + } + + @Override + public boolean isStarted() { + return working; + } + + @Override + public void start() { + if (working) { + return; + } + + working = true; +// sender = new SenderThread(); +// try { +// sender.connect(); +// } catch (IOException ignore) { +// } +// sender.start(); + for (int i = 0; i < senders.length; i++) { + senders[i] = new SenderThread(); + try { + senders[i].connect(); + } catch (IOException ignore) { + } + senders[i].start(); + } + + } + + @Override + public void stop() { + if (working) { + working = false; +// sender.interrupt(); +// try { +// sender.join(500); +// } catch (Exception ex) { +// } +// sender = null; + for (int i = 0; i < senders.length; i++) { + senders[i].interrupt(); + try { + senders[i].join(500); + } catch (Exception ex) { + } + senders[i] = null; + } + } + } + + public void waitForEmptyQueue() throws InterruptedException { + while (queue.isEmpty() == false) { + Thread.sleep(100); + } + } + + public boolean send(Object object) { + if (dropIfQueueFull) { + if (!queue.offer(object)) { + LOGGER.debug("Could not enqueue message."); + return false; + } + } else { + try { + queue.put(object); + } catch (InterruptedException ignore) { + return false; + } + } + return true; + } + + private class SenderThread extends Thread { + + private final ReporterEncoder encoder = new ReporterEncoder(); + private Socket socket; // only synchronized access + private volatile DataOutputStream os; + + private synchronized void connect() throws IOException { + try { + socket = new Socket(serverAddress.getAddress(), serverAddress.getPort()); + os = new DataOutputStream(socket.getOutputStream()); + } catch (IOException ex) { + LOGGER.warn("Could not connect.", ex); + } + } + + private synchronized void disconnect() { + try { + if (socket != null) { + socket.close(); + socket = null; + os = null; + } + } catch (IOException ignore) { + } + } + + private boolean isConnected() { + return os != null; + } + + @Override + public void run() { + while (working) { + try { + // TODO: refactor + Object metric = queue.take(); + int code; + byte[] bytes; + if (metric instanceof Metric) { + MetricRecord record = converter.convert((Metric) metric, false); + code = ReporterDispatcher.TYPE_METRIC; + bytes = encoder.encode(record); + } else if (metric instanceof MetricsCollection) { + MetricsCollectionRecord record = converter.convert((MetricsCollection) metric); + code = ReporterDispatcher.TYPE_METRICS_COLLECTION; + bytes = encoder.encode(record); + } else { + continue; + } + + if (!isConnected()) { + connect(); + } + if (isConnected()) { +// CharSequence result; +// if (record instanceof MetricRecord) { +// result = proxy.handleMetric((MetricRecord) record); +// } else if (record instanceof MetricsCollectionRecord) { +// result = proxy.handleMetricsCollection((MetricsCollectionRecord) record); +// } else { +// continue; +// } +// LOGGER.trace("result: {}", result); + synchronized (this) { + os.writeInt(bytes.length + 4); + os.writeInt(code); + os.write(bytes); + } + } + } catch (InterruptedException ex) { + LOGGER.trace("Queue.take() was interrupted."); + } catch (AvroRemoteException ex) { + LOGGER.warn("Could not send.", ex); + } catch (IOException ex) { + LOGGER.warn("Could not connect.", ex); + } catch (Exception ex) { + LOGGER.error(ex); + } + + } + + disconnect(); + LOGGER.debug("Sender stopped."); + } + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterDispatcher.java Wed Sep 20 23:11:11 2017 +0200 @@ -0,0 +1,62 @@ +package com.passus.st.reporter.trx; + +import com.passus.st.reporter.protocol.MetricRecord; +import com.passus.st.reporter.protocol.MetricsCollectionRecord; +import com.passus.st.reporter.protocol.Reporter; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import org.apache.avro.AvroRemoteException; +import org.apache.avro.Schema; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificDatumReader; + +/** + * + * @author mikolaj.podbielski + */ +public class ReporterDispatcher { + + public static final int TYPE_METRIC = 1; + public static final int TYPE_METRICS_COLLECTION = 2; + + private static final Schema METRIC_SCHEMA = MetricRecord.getClassSchema(); + private static final Schema METRICS_COLLECTION_SCHEMA = MetricsCollectionRecord.getClassSchema(); + + private final DecoderFactory decoderFactory = DecoderFactory.get(); + private final SpecificDatumReader<MetricRecord> metricReader = new SpecificDatumReader<>(METRIC_SCHEMA); + private final SpecificDatumReader<MetricsCollectionRecord> metricsCollectionReader = new SpecificDatumReader<>(METRICS_COLLECTION_SCHEMA); + + private final Reporter reporter; + + public ReporterDispatcher(Reporter reporter) { + this.reporter = reporter; + } + + public void dispatch(byte[] frame) throws AvroRemoteException, IOException { + int code = getInt4(frame, 0); + ByteArrayInputStream bais = new ByteArrayInputStream(frame, 4, frame.length - 4); + BinaryDecoder decoder = decoderFactory.binaryDecoder(bais, null); + + switch (code) { + case TYPE_METRIC: + MetricRecord metric = metricReader.read(null, decoder); + reporter.handleMetric(metric); + break; + case TYPE_METRICS_COLLECTION: + MetricsCollectionRecord metricsCollection = metricsCollectionReader.read(null, decoder); + reporter.handleMetricsCollection(metricsCollection); + break; + default: + System.out.println("Invalid message code: " + code); + } + } + + // copy-pasted from DataUtils + public static int getInt4(byte[] data, int off) { + return (((data[off] & 0xff) << 24) + | ((data[off + 1] & 0xff) << 16) + | ((data[off + 2] & 0xff) << 8) + | ((data[off + 3] & 0xff))); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterEncoder.java Wed Sep 20 23:11:11 2017 +0200 @@ -0,0 +1,42 @@ +package com.passus.st.reporter.trx; + +import com.passus.st.reporter.protocol.MetricRecord; +import com.passus.st.reporter.protocol.MetricsCollectionRecord; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumWriter; + +/** + * + * @author mikolaj.podbielski + */ +public class ReporterEncoder { + + private static final Schema METRIC_SCHEMA = MetricRecord.getClassSchema(); + private static final Schema METRICS_COLLECTION_SCHEMA = MetricsCollectionRecord.getClassSchema(); + + private final EncoderFactory encoderFactory = EncoderFactory.get(); + private final SpecificDatumWriter<MetricRecord> metricWriter = new SpecificDatumWriter<>(METRIC_SCHEMA); + private final SpecificDatumWriter<MetricsCollectionRecord> metricsCollectionWriter = new SpecificDatumWriter<>(METRICS_COLLECTION_SCHEMA); + + private final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); + private final BinaryEncoder encoder = encoderFactory.binaryEncoder(baos, null); + + public byte[] encode(MetricRecord metric) throws IOException { + return encode(metricWriter, metric); + } + + public byte[] encode(MetricsCollectionRecord metricsCollection) throws IOException { + return encode(metricsCollectionWriter, metricsCollection); + } + + private <T> byte[] encode(SpecificDatumWriter<T> writer, T record) throws IOException { + baos.reset(); + writer.write(record, encoder); + encoder.flush(); + return baos.toByteArray(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ServerMain.java Wed Sep 20 23:11:11 2017 +0200 @@ -0,0 +1,148 @@ +package com.passus.st.reporter.trx; + +import com.passus.st.reporter.protocol.Reporter; +import com.passus.st.reporter.server.ReporterImpl; +import com.passus.st.utils.CliUtils; +import static com.passus.st.utils.CliUtils.option; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.bytes.ByteArrayDecoder; +import java.io.IOException; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import static com.passus.st.utils.CliUtils.option; + +/** + * + * @author Mirosław Hawrot + */ +public class ServerMain { + + public static final int PORT = 400; + public static final boolean EPOLL = false; + + private final String bindHost; + private final int port; + private final int numThreads; + private final int maxFrameSize = 8192; + + private final EventLoopGroup bossGroup; + private final EventLoopGroup workerGroup; + private final Class socketChannelClass; + + private final Reporter reporter; + + public ServerMain(String bindHost, int port, int numThreads, Reporter reporter) { + this.bindHost = bindHost; + this.port = port; + this.numThreads = numThreads; + this.reporter = reporter; + + if (EPOLL) { + bossGroup = new EpollEventLoopGroup(numThreads); + workerGroup = new EpollEventLoopGroup(); + socketChannelClass = EpollServerSocketChannel.class; + } else { + bossGroup = new NioEventLoopGroup(numThreads); + workerGroup = new NioEventLoopGroup(); + socketChannelClass = NioServerSocketChannel.class; + } + } + + public void start() { + try { + ServerBootstrap boot = new ServerBootstrap(); + boot.option(ChannelOption.SO_BACKLOG, 100_000); + boot.option(ChannelOption.SO_REUSEADDR, true); + boot.group(bossGroup, workerGroup) + .channel(socketChannelClass) + .childHandler(new TestServerInitializer()); + + Channel ch = boot.bind(bindHost, port).sync().channel(); + System.err.println("Server started " + bindHost + ":" + port + " epoll: " + EPOLL); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + + public void stop() { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + System.err.println("Server stopped."); + } + + public class TestServerInitializer extends ChannelInitializer<SocketChannel> { + + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + + pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)); + pipeline.addLast("bytesDecoder", new ByteArrayDecoder()); + pipeline.addLast(new ReporterChannelInboundHandler(reporter)); + } + } + + public static class ReporterChannelInboundHandler extends SimpleChannelInboundHandler<byte[]> { + + private final ReporterDispatcher dispatcher; + + public ReporterChannelInboundHandler(Reporter reporter) { + dispatcher = new ReporterDispatcher(reporter); + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, byte[] data) throws Exception { + dispatcher.dispatch(data); + } + } + + public static void main(String[] args) throws IOException { + final String syntax = "server <bind address> [-t threads=500]"; + final Options options = new Options(); + options.addOption(option("t", "threads", "number of worker threads", "threads")); + + options.addOption(option("m", "merged").desc("Write file with merged request and response data") + .hasArg(false) + .build() + ); + + try { + CommandLine cl = new DefaultParser().parse(options, args); + String[] clArgs = cl.getArgs(); + if (clArgs.length != 1) { + CliUtils.printHelp(options, syntax); + return; + } + + int threads = Integer.parseInt(cl.getOptionValue('t', "500")); + + ReporterImpl reporter = new ReporterImpl(cl.hasOption("m")); + reporter.setVerbose(false); + + ServerMain server = new ServerMain(clArgs[0], PORT, threads, reporter); + server.start(); + + Runnable shutdown = () -> { + System.out.println("shutdown"); + server.stop(); + }; + Runtime.getRuntime().addShutdownHook(new Thread(shutdown)); + } catch (org.apache.commons.cli.ParseException ex) { + CliUtils.printHelp(options, syntax); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/Main.java Wed Sep 20 23:11:11 2017 +0200 @@ -0,0 +1,51 @@ +package com.passus.st.reporter.server; + +import com.passus.st.reporter.protocol.MetricRecord; +import com.passus.st.reporter.trx.ReporterClient; +import com.passus.st.reporter.trx.ReporterDispatcher; +import com.passus.st.reporter.trx.ReporterEncoder; +import java.io.DataOutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.HashMap; +import java.util.Map; + +/** + * + * @author mikolaj.podbielski + */ +public class Main { + + public static void main(String[] args) throws Exception { + Map<CharSequence, Object> fields = new HashMap<>(); + fields.put("url", "wp.pl/xyz"); + fields.put("ip", "127.0.0.1"); + fields.put("port", 12345); + fields.put("timestamp", 1234567890L); + MetricRecord metricRecord = new MetricRecord("httpRequestResponse", fields); + + ReporterEncoder reporterEncoder = new ReporterEncoder(); + byte[] bytes = reporterEncoder.encode(metricRecord); + int code = ReporterDispatcher.TYPE_METRIC; + int length = bytes.length + 4; + + int count = 6; + try (Socket s = new Socket("localhost", 400); + DataOutputStream os = new DataOutputStream(s.getOutputStream());) { + + for (int i = 0; i < count; i++) { + os.writeInt(length); + os.writeInt(code); + os.write(bytes); + } + } + + + ReporterClient rc = new ReporterClient(new InetSocketAddress("localhost", 400)); + rc.start(); + rc.send(new TestMetric()); + rc.waitForEmptyQueue(); + rc.stop(); + } + +}
--- a/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/Test.java Wed Sep 20 13:29:12 2017 +0200 +++ b/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/Test.java Wed Sep 20 23:11:11 2017 +0200 @@ -20,12 +20,12 @@ */ public class Test { - private static final Map<CharSequence, Object> FIELDS = map( + static final Map<CharSequence, Object> FIELDS = map( e("int_fld", 1), e("string_fld", "ok"), e("map_fld", smap("some", "entry")), e("bstr_fld", ByteString.create("byte-string-value")) ); - private static final MetricRecord METRIC = new MetricRecord("metric-x", FIELDS); - private static final InetSocketAddress ADDRESS = new InetSocketAddress(ServerMain.PORT); + static final MetricRecord METRIC = new MetricRecord("metric-x", FIELDS); + static final InetSocketAddress ADDRESS = new InetSocketAddress("localhost", 11111); public static void main(String[] args) throws IOException { ReporterImpl reporter = new ReporterImpl();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestCustom.java Wed Sep 20 23:11:11 2017 +0200 @@ -0,0 +1,36 @@ +package com.passus.st.reporter.server; + +import com.passus.commons.metric.Metric; +import com.passus.st.reporter.trx.ServerMain; +import com.passus.st.reporter.trx.ReporterClient; +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * + * @author mikolaj.podbielski + */ +public class TestCustom { + + static final TestMetric METRIC = new TestMetric(); + + public static void main(String[] args) throws IOException, InterruptedException { + ReporterImpl reporter = new ReporterImpl(); + reporter.setVerbose(true); + ServerMain server = new ServerMain("localhost", ServerMain.PORT, 100, reporter); + server.start(); + System.out.println("server started"); + + ReporterClient client = new ReporterClient(Test.ADDRESS); + client.start(); + client.send(METRIC); + client.waitForEmptyQueue(); + client.stop(); + + server.stop(); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestMetric.java Wed Sep 20 23:11:11 2017 +0200 @@ -0,0 +1,78 @@ +package com.passus.st.reporter.server; + +import com.passus.commons.metric.Metric; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * + * @author mikolaj.podbielski + */ +public class TestMetric implements Metric { + + private final HashMap<String, Serializable> attributes = new HashMap<>(); + + { + attributes.put("String", "value"); + attributes.put("Long", 12345L); + attributes.put("Integer", 123); + attributes.put("Float", 123.45f); + } + + @Override + public String getName() { + return "test"; + } + + @Override + public boolean isActive() { + return true; + } + + @Override + public void activate() { + } + + @Override + public void deactivate() { + } + + @Override + public Class getAttributeClass(String name) { + Serializable value = attributes.get(name); + return value.getClass(); + } + + @Override + public boolean hasAttribute(String name) { + return attributes.containsKey(name); + } + + @Override + public Set<String> getAttributesName() { + return attributes.keySet(); + } + + @Override + public Serializable getAttributeValue(String name) { + return attributes.get(name); + } + + @Override + public Map<String, Serializable> getAttributesValue() { + return attributes; + } + + @Override + public void update(Metric metric) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public void reset() { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + +}