Mercurial > stress-tester
changeset 558:7c805a77d33d
reporter - transport
line wrap: on
line diff
--- a/stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroCustomBenchmark.java Wed Sep 20 23:11:11 2017 +0200 +++ b/stress-tester-benchmark/src/main/java/com/passus/st/avro/AvroCustomBenchmark.java Thu Sep 21 11:56:09 2017 +0200 @@ -1,7 +1,7 @@ package com.passus.st.avro; import com.passus.st.reporter.trx.ServerMain; -import com.passus.st.reporter.trx.ReporterClient; +import com.passus.st.reporter.trx.SocketReporterClient; import com.passus.st.reporter.protocol.Reporter; import com.passus.utils.AllocationUtils; import java.io.IOException; @@ -36,7 +36,7 @@ public class AvroCustomBenchmark extends AbstractAvroBenchmark { private final ServerMain server = new ServerMain("localhost", 11111, 500, new DummyReporter()); - private final ReporterClient client = new ReporterClient(serverAddress, new SynchronousQueue<>(), false); + private final SocketReporterClient client = new SocketReporterClient(serverAddress, new SynchronousQueue<>(), false); private final TestMetric metric = new TestMetric(); @Setup
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/ReporterClient.java Thu Sep 21 11:56:09 2017 +0200 @@ -0,0 +1,14 @@ +package com.passus.st.reporter; + +import com.passus.commons.service.Service; + +/** + * + * @author mikolaj.podbielski + */ +public interface ReporterClient extends Service { + + public boolean send(Object object); + + void waitForEmptyQueue() throws InterruptedException; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/ReporterImpl.java Thu Sep 21 11:56:09 2017 +0200 @@ -0,0 +1,207 @@ +package com.passus.st.reporter; + +import com.passus.st.reporter.protocol.MetricRecord; +import com.passus.st.reporter.protocol.MetricsCollectionRecord; +import com.passus.st.reporter.protocol.Reporter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.List; +import java.util.Map; +import org.apache.avro.AvroRemoteException; +import org.apache.avro.util.Utf8; + +/** + * + * @author mikolaj.podbielski + */ +public class ReporterImpl implements Reporter { + + private final PrintWriter reqFile; + private final PrintWriter respFile; + private final PrintWriter reqRespFile; + private final PrintWriter emitterFile; + private final boolean merge; + private boolean verbose; + + public ReporterImpl() throws IOException { + this(false); + } + + public ReporterImpl(boolean merge) throws IOException { + reqFile = new PrintWriter("requests.csv", "UTF-8"); + respFile = new PrintWriter("responses.csv", "UTF-8"); + reqRespFile = merge ? new PrintWriter("all.csv", "UTF-8") : null; + emitterFile = new PrintWriter("emitter.csv", "UTF-8"); + this.merge = merge; + } + + public void close() { + reqFile.close(); + respFile.close(); + if (merge) { + reqRespFile.close(); + } + emitterFile.close(); + } + + public void setVerbose(boolean verbose) { + this.verbose = verbose; + } + + private static void addValue(StringBuilder builder, Object value) { + if (value != null) { + builder.append('"'); + builder.append(value.toString()); + builder.append('"'); + } + builder.append(';'); + } + + private static String getValue(Map<CharSequence, ? extends Object> map, String header) { + Object result = null; + if (map != null) { + result = map.get(new Utf8(header)); + } + return result == null ? null : result.toString(); + } + + @Override + public CharSequence handleMetric(MetricRecord metric) throws AvroRemoteException { + if (verbose) { + System.out.println(metric); + } + + String code = metric.getCode().toString(); + + if (code.equalsIgnoreCase("httpRequestResponse")) { + Map<CharSequence, Object> fields = metric.getFields(); + StringBuilder builder = new StringBuilder(); + + addValue(builder, getValue(fields, "reqId")); + addValue(builder, getValue(fields, "method")); + addValue(builder, getValue(fields, "reqVersion")); + addValue(builder, getValue(fields, "url")); + addValue(builder, getValue(fields, "reqStart")); + addValue(builder, getValue(fields, "reqStop")); + addValue(builder, getValue(fields, "serverIp")); + addValue(builder, getValue(fields, "serverPort")); + addValue(builder, getValue(fields, "clientIp")); + addValue(builder, getValue(fields, "clientPort")); + addValue(builder, getValue(fields, "reqHdrSize")); + addValue(builder, getValue(fields, "reqCntSize")); + Map<CharSequence, CharSequence> reqHdrs = (Map<CharSequence, CharSequence>) fields.get(new Utf8("reqHdrs")); + addValue(builder, getValue(reqHdrs, "User-Agent")); + Map<CharSequence, CharSequence> misc = (Map<CharSequence, CharSequence>) fields.get(new Utf8("misc")); + addValue(builder, getValue(misc, "sessionId")); + addValue(builder, getValue(misc, "username")); + reqFile.println(builder.toString()); + reqFile.flush(); + builder.setLength(0); + + addValue(builder, getValue(fields, "reqId")); + addValue(builder, getValue(fields, "reason")); + addValue(builder, getValue(fields, "code")); + addValue(builder, getValue(fields, "respStart")); + addValue(builder, getValue(fields, "respStop")); + Map<CharSequence, CharSequence> respHdrs = (Map<CharSequence, CharSequence>) fields.get(new Utf8("respHdrs")); + addValue(builder, getValue(respHdrs, "Content-Type")); + addValue(builder, getValue(fields, "respHdrSize")); + addValue(builder, getValue(fields, "respCntSize")); + respFile.println(builder.toString()); + respFile.flush(); + builder.setLength(0); + + // unused keys: + // origClientIp origClientPort origServerIp origServerPort + // respVersion + long reqStart = Long.parseLong(getValue(fields, "reqStart")); + long reqStop = Long.parseLong(getValue(fields, "reqStop")); + long respStart = Long.parseLong(getValue(fields, "respStart")); + long respStop = Long.parseLong(getValue(fields, "respStop")); + String url = getValue(fields, "url"); + printDuration(reqStart, reqStop, "req", url); + printDuration(reqStop, respStart, "r2r", url); + printDuration(respStart, respStop, "res", url); + + if (merge) { + addValue(builder, getValue(fields, "reqId")); + addValue(builder, getValue(fields, "clientPort")); + addValue(builder, getValue(fields, "reqStart")); + addValue(builder, getValue(fields, "reqStop")); + addValue(builder, getValue(fields, "respStart")); + addValue(builder, getValue(fields, "respStop")); + addValue(builder, String.valueOf(reqStop - reqStart)); + addValue(builder, String.valueOf(respStart - reqStop)); + addValue(builder, String.valueOf(respStop - respStart)); + addValue(builder, getValue(fields, "reqHdrSize")); + addValue(builder, getValue(fields, "reqCntSize")); + addValue(builder, getValue(fields, "respHdrSize")); + addValue(builder, getValue(fields, "respCntSize")); + addValue(builder, getValue(fields, "code")); + addValue(builder, getValue(fields, "method")); + addValue(builder, getValue(fields, "url")); + builder.setLength(builder.length() - 1); + reqRespFile.println(builder.toString()); + reqRespFile.flush(); + builder.setLength(0); + } + } + + return "OK"; + } + + private static void printDuration(long start, long end, String name, String url) { + long diff = end - start; + if (diff > 600) { + System.out.println("long " + name + ": " + diff + " " + url); + } + } + + @Override + public CharSequence handleMetricsCollection(MetricsCollectionRecord collection) throws AvroRemoteException { + if (verbose) { + System.out.println(collection); + } + StringBuilder builder = new StringBuilder(); + + addValue(builder, collection.getStartTimestamp()); + addValue(builder, collection.getEndTimestamp()); + + MetricRecord emitterMetric = findMetric(collection, "emitter"); + if (emitterMetric != null) { + Map<CharSequence, Object> emitterFields = emitterMetric.getFields(); + addValue(builder, emitterFields.get(new Utf8("receivedBytes"))); + addValue(builder, emitterFields.get(new Utf8("sentBytes"))); + addValue(builder, emitterFields.get(new Utf8("establishedConnections"))); + addValue(builder, emitterFields.get(new Utf8("closedConnections"))); + addValue(builder, emitterFields.get(new Utf8("bindErrors"))); + + } else { + builder.append(";;;;;"); + } + + emitterMetric = findMetric(collection, "pcapSource"); + if (emitterMetric != null) { + Map<CharSequence, Object> emitterFields = emitterMetric.getFields(); + addValue(builder, emitterFields.get(new Utf8("frames"))); + addValue(builder, emitterFields.get(new Utf8("tcpPackets"))); + } else { + builder.append(";;"); + } + + emitterFile.println(builder.toString()); + emitterFile.flush(); + + return "OK"; + } + + private static MetricRecord findMetric(MetricsCollectionRecord collection, String name) { + List<MetricRecord> metrics = collection.getMetrics(); + for (MetricRecord metric : metrics) { + if (metric.getCode().toString().equals(name)) { + return metric; + } + } + return null; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/AvroRpcReporterClient.java Thu Sep 21 11:56:09 2017 +0200 @@ -0,0 +1,185 @@ +package com.passus.st.reporter.server; + +import com.passus.commons.metric.Metric; +import com.passus.commons.metric.MetricsCollection; +import com.passus.commons.utils.SimpleThreadFactory; +import com.passus.st.reporter.MetricConverter; +import com.passus.st.reporter.ReporterClient; +import com.passus.st.reporter.protocol.MetricRecord; +import com.passus.st.reporter.protocol.MetricsCollectionRecord; +import com.passus.st.reporter.protocol.Reporter; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.avro.AvroRemoteException; +import org.apache.avro.ipc.NettyTransceiver; +import org.apache.avro.ipc.specific.SpecificRequestor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; + +/** + * + * @author mikolaj.podbielski + */ +public class AvroRpcReporterClient implements ReporterClient { + + private static final Logger LOGGER = LogManager.getLogger(AvroRpcReporterClient.class); + + private final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(4096); + private final MetricConverter converter = new MetricConverter(); + private final InetSocketAddress serverAddress; + private volatile boolean working; + private SenderThread sender; + + public AvroRpcReporterClient(InetSocketAddress serverAddress) { + this.serverAddress = serverAddress; + } + + @Override + public boolean send(Object object) { + if (!queue.offer(object)) { + System.out.println("reporter queue full"); + LOGGER.debug("Could not enqueue message."); + return false; + } + return true; + } + + @Override + public boolean isStarted() { + return working; + } + + @Override + public void start() { + if (working) { + return; + } + + working = true; + sender = new SenderThread(); + try { + sender.connect(); + } catch (IOException ignore) { + } + sender.start(); + } + + @Override + public void stop() { + if (working) { + working = false; + sender.interrupt(); + try { + sender.join(500); + } catch (Exception ex) { + } + sender = null; + } + } + + @Override + public void waitForEmptyQueue() throws InterruptedException { + while (queue.isEmpty() == false) { + Thread.sleep(100); + } + } + + private static Executor pool(String name, int corePoolSize) { + SimpleThreadFactory factory = new SimpleThreadFactory("Avro NettyTransceiver" + name); + return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, + new SynchronousQueue<>(), factory); + } + + private class SenderThread extends Thread { + + private static final long RETRY_PERIOD = 2000; + + private NettyTransceiver client; + private Reporter proxy; + private long lastFailure; + + private void connect() throws IOException { + if (lastFailure + RETRY_PERIOD > System.currentTimeMillis()) { + return; + } + + NioClientSocketChannelFactory factory = new NioClientSocketChannelFactory( + pool(" Boss", 1), pool(" I/O Worker", 4)); + try { + client = new NettyTransceiver(serverAddress, factory, 10_000L); + proxy = (Reporter) SpecificRequestor.getClient(Reporter.class, client); + } catch (Exception ex) { + lastFailure = System.currentTimeMillis(); + disconnect(); + factory.releaseExternalResources(); + throw ex; + } + } + + private void disconnect() { + if (client != null) { + client.close(); + } + proxy = null; + client = null; + } + + private boolean isConnected() { + return proxy != null; + } + + @Override + public void run() { + while (working) { + try { + // TODO: refactor + Object metric = queue.take(); + Object record; + if (metric instanceof Metric) { + record = converter.convert((Metric) metric, false); + } else if (metric instanceof MetricsCollection) { + record = converter.convert((MetricsCollection) metric); + } else { + continue; + } + + if (!isConnected()) { + connect(); + } + if (isConnected()) { + CharSequence result; + if (record instanceof MetricRecord) { + result = proxy.handleMetric((MetricRecord) record); + } else if (record instanceof MetricsCollectionRecord) { + result = proxy.handleMetricsCollection((MetricsCollectionRecord) record); + } else { + System.out.println(metric.getClass()); + continue; + } + LOGGER.trace("result: {}", result); + } + + } catch (InterruptedException ex) { + LOGGER.trace("Queue.take() was interrupted."); + } catch (AvroRemoteException ex) { + LOGGER.warn("Could not send.", ex); + } catch (IOException ex) { + LOGGER.warn("Could not connect.", ex); + } catch (Exception ex) { + LOGGER.error(ex); + } + } + + disconnect(); + LOGGER.debug("Sender stopped."); + } + + } +}
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ReporterClient.java Wed Sep 20 23:11:11 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,179 +0,0 @@ -package com.passus.st.reporter.server; - -import com.passus.commons.metric.Metric; -import com.passus.commons.metric.MetricsCollection; -import com.passus.commons.service.Service; -import com.passus.commons.utils.SimpleThreadFactory; -import com.passus.st.reporter.MetricConverter; -import com.passus.st.reporter.protocol.MetricRecord; -import com.passus.st.reporter.protocol.MetricsCollectionRecord; -import com.passus.st.reporter.protocol.Reporter; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.apache.avro.AvroRemoteException; -import org.apache.avro.ipc.NettyTransceiver; -import org.apache.avro.ipc.specific.SpecificRequestor; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; - -/** - * - * @author mikolaj.podbielski - */ -public class ReporterClient implements Service { - - private static final Logger LOGGER = LogManager.getLogger(ReporterClient.class); - - private final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(4096); - private final MetricConverter converter = new MetricConverter(); - private final InetSocketAddress serverAddress; - private volatile boolean working; - private SenderThread sender; - - public ReporterClient(InetSocketAddress serverAddress) { - this.serverAddress = serverAddress; - } - - public void send(Object object) { - if (!queue.offer(object)) { - LOGGER.debug("Could not enqueue message."); - } - } - - @Override - public boolean isStarted() { - return working; - } - - @Override - public void start() { - if (working) { - return; - } - - working = true; - sender = new SenderThread(); - try { - sender.connect(); - } catch (IOException ignore) { - } - sender.start(); - } - - @Override - public void stop() { - if (working) { - working = false; - sender.interrupt(); - try { - sender.join(500); - } catch (Exception ex) { - } - sender = null; - } - } - - public void waitForEmptyQueue() throws InterruptedException { - while (queue.isEmpty() == false) { - Thread.sleep(100); - } - } - - private static Executor pool(String name, int corePoolSize) { - SimpleThreadFactory factory = new SimpleThreadFactory("Avro NettyTransceiver" + name); - return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, - new SynchronousQueue<>(), factory); - } - - private class SenderThread extends Thread { - - private static final long RETRY_PERIOD = 2000; - - private NettyTransceiver client; - private Reporter proxy; - private long lastFailure; - - private void connect() throws IOException { - if (lastFailure + RETRY_PERIOD > System.currentTimeMillis()) { - return; - } - - NioClientSocketChannelFactory factory = new NioClientSocketChannelFactory( - pool(" Boss", 1), pool(" I/O Worker", 4)); - try { - client = new NettyTransceiver(serverAddress, factory, 10_000L); - proxy = (Reporter) SpecificRequestor.getClient(Reporter.class, client); - } catch (Exception ex) { - lastFailure = System.currentTimeMillis(); - disconnect(); - factory.releaseExternalResources(); - throw ex; - } - } - - private void disconnect() { - if (client != null) { - client.close(); - } - proxy = null; - client = null; - } - - private boolean isConnected() { - return proxy != null; - } - - @Override - public void run() { - while (working) { - try { - // TODO: refactor - Object metric = queue.take(); - Object record; - if (metric instanceof Metric) { - record = converter.convert((Metric) metric, false); - } else if (metric instanceof MetricsCollection) { - record = converter.convert((MetricsCollection) metric); - } else { - continue; - } - - if (!isConnected()) { - connect(); - } - if (isConnected()) { - CharSequence result; - if (record instanceof MetricRecord) { - result = proxy.handleMetric((MetricRecord) record); - } else if (record instanceof MetricsCollectionRecord) { - result = proxy.handleMetricsCollection((MetricsCollectionRecord) record); - } else { - continue; - } - LOGGER.trace("result: {}", result); - } - - } catch (InterruptedException ex) { - LOGGER.trace("Queue.take() was interrupted."); - } catch (AvroRemoteException ex) { - LOGGER.warn("Could not send.", ex); - } catch (IOException ex) { - LOGGER.warn("Could not connect.", ex); - } catch (Exception ex) { - LOGGER.error(ex); - } - } - - disconnect(); - LOGGER.debug("Sender stopped."); - } - - } -}
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ReporterImpl.java Wed Sep 20 23:11:11 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,207 +0,0 @@ -package com.passus.st.reporter.server; - -import com.passus.st.reporter.protocol.MetricRecord; -import com.passus.st.reporter.protocol.MetricsCollectionRecord; -import com.passus.st.reporter.protocol.Reporter; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.List; -import java.util.Map; -import org.apache.avro.AvroRemoteException; -import org.apache.avro.util.Utf8; - -/** - * - * @author mikolaj.podbielski - */ -public class ReporterImpl implements Reporter { - - private final PrintWriter reqFile; - private final PrintWriter respFile; - private final PrintWriter reqRespFile; - private final PrintWriter emitterFile; - private final boolean merge; - private boolean verbose; - - public ReporterImpl() throws IOException { - this(false); - } - - public ReporterImpl(boolean merge) throws IOException { - reqFile = new PrintWriter("requests.csv", "UTF-8"); - respFile = new PrintWriter("responses.csv", "UTF-8"); - reqRespFile = merge ? new PrintWriter("all.csv", "UTF-8") : null; - emitterFile = new PrintWriter("emitter.csv", "UTF-8"); - this.merge = merge; - } - - public void close() { - reqFile.close(); - respFile.close(); - if (merge) { - reqRespFile.close(); - } - emitterFile.close(); - } - - public void setVerbose(boolean verbose) { - this.verbose = verbose; - } - - private static void addValue(StringBuilder builder, Object value) { - if (value != null) { - builder.append('"'); - builder.append(value.toString()); - builder.append('"'); - } - builder.append(';'); - } - - private static String getValue(Map<CharSequence, ? extends Object> map, String header) { - Object result = null; - if (map != null) { - result = map.get(new Utf8(header)); - } - return result == null ? null : result.toString(); - } - - @Override - public CharSequence handleMetric(MetricRecord metric) throws AvroRemoteException { - if (verbose) { - System.out.println(metric); - } - - String code = metric.getCode().toString(); - - if (code.equalsIgnoreCase("httpRequestResponse")) { - Map<CharSequence, Object> fields = metric.getFields(); - StringBuilder builder = new StringBuilder(); - - addValue(builder, getValue(fields, "reqId")); - addValue(builder, getValue(fields, "method")); - addValue(builder, getValue(fields, "reqVersion")); - addValue(builder, getValue(fields, "url")); - addValue(builder, getValue(fields, "reqStart")); - addValue(builder, getValue(fields, "reqStop")); - addValue(builder, getValue(fields, "serverIp")); - addValue(builder, getValue(fields, "serverPort")); - addValue(builder, getValue(fields, "clientIp")); - addValue(builder, getValue(fields, "clientPort")); - addValue(builder, getValue(fields, "reqHdrSize")); - addValue(builder, getValue(fields, "reqCntSize")); - Map<CharSequence, CharSequence> reqHdrs = (Map<CharSequence, CharSequence>) fields.get(new Utf8("reqHdrs")); - addValue(builder, getValue(reqHdrs, "User-Agent")); - Map<CharSequence, CharSequence> misc = (Map<CharSequence, CharSequence>) fields.get(new Utf8("misc")); - addValue(builder, getValue(misc, "sessionId")); - addValue(builder, getValue(misc, "username")); - reqFile.println(builder.toString()); - reqFile.flush(); - builder.setLength(0); - - addValue(builder, getValue(fields, "reqId")); - addValue(builder, getValue(fields, "reason")); - addValue(builder, getValue(fields, "code")); - addValue(builder, getValue(fields, "respStart")); - addValue(builder, getValue(fields, "respStop")); - Map<CharSequence, CharSequence> respHdrs = (Map<CharSequence, CharSequence>) fields.get(new Utf8("respHdrs")); - addValue(builder, getValue(respHdrs, "Content-Type")); - addValue(builder, getValue(fields, "respHdrSize")); - addValue(builder, getValue(fields, "respCntSize")); - respFile.println(builder.toString()); - respFile.flush(); - builder.setLength(0); - - // unused keys: - // origClientIp origClientPort origServerIp origServerPort - // respVersion - long reqStart = Long.parseLong(getValue(fields, "reqStart")); - long reqStop = Long.parseLong(getValue(fields, "reqStop")); - long respStart = Long.parseLong(getValue(fields, "respStart")); - long respStop = Long.parseLong(getValue(fields, "respStop")); - String url = getValue(fields, "url"); - printDuration(reqStart, reqStop, "req", url); - printDuration(reqStop, respStart, "r2r", url); - printDuration(respStart, respStop, "res", url); - - if (merge) { - addValue(builder, getValue(fields, "reqId")); - addValue(builder, getValue(fields, "clientPort")); - addValue(builder, getValue(fields, "reqStart")); - addValue(builder, getValue(fields, "reqStop")); - addValue(builder, getValue(fields, "respStart")); - addValue(builder, getValue(fields, "respStop")); - addValue(builder, String.valueOf(reqStop - reqStart)); - addValue(builder, String.valueOf(respStart - reqStop)); - addValue(builder, String.valueOf(respStop - respStart)); - addValue(builder, getValue(fields, "reqHdrSize")); - addValue(builder, getValue(fields, "reqCntSize")); - addValue(builder, getValue(fields, "respHdrSize")); - addValue(builder, getValue(fields, "respCntSize")); - addValue(builder, getValue(fields, "code")); - addValue(builder, getValue(fields, "method")); - addValue(builder, getValue(fields, "url")); - builder.setLength(builder.length() - 1); - reqRespFile.println(builder.toString()); - reqRespFile.flush(); - builder.setLength(0); - } - } - - return "OK"; - } - - private static void printDuration(long start, long end, String name, String url) { - long diff = end - start; - if (diff > 600) { - System.out.println("long " + name + ": " + diff + " " + url); - } - } - - @Override - public CharSequence handleMetricsCollection(MetricsCollectionRecord collection) throws AvroRemoteException { - if (verbose) { - System.out.println(collection); - } - StringBuilder builder = new StringBuilder(); - - addValue(builder, collection.getStartTimestamp()); - addValue(builder, collection.getEndTimestamp()); - - MetricRecord emitterMetric = findMetric(collection, "emitter"); - if (emitterMetric != null) { - Map<CharSequence, Object> emitterFields = emitterMetric.getFields(); - addValue(builder, emitterFields.get(new Utf8("receivedBytes"))); - addValue(builder, emitterFields.get(new Utf8("sentBytes"))); - addValue(builder, emitterFields.get(new Utf8("establishedConnections"))); - addValue(builder, emitterFields.get(new Utf8("closedConnections"))); - addValue(builder, emitterFields.get(new Utf8("bindErrors"))); - - } else { - builder.append(";;;;;"); - } - - emitterMetric = findMetric(collection, "pcapSource"); - if (emitterMetric != null) { - Map<CharSequence, Object> emitterFields = emitterMetric.getFields(); - addValue(builder, emitterFields.get(new Utf8("frames"))); - addValue(builder, emitterFields.get(new Utf8("tcpPackets"))); - } else { - builder.append(";;"); - } - - emitterFile.println(builder.toString()); - emitterFile.flush(); - - return "OK"; - } - - private static MetricRecord findMetric(MetricsCollectionRecord collection, String name) { - List<MetricRecord> metrics = collection.getMetrics(); - for (MetricRecord metric : metrics) { - if (metric.getCode().toString().equals(name)) { - return metric; - } - } - return null; - } -}
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ServerMain.java Wed Sep 20 23:11:11 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/server/ServerMain.java Thu Sep 21 11:56:09 2017 +0200 @@ -1,5 +1,6 @@ package com.passus.st.reporter.server; +import com.passus.st.reporter.ReporterImpl; import com.passus.st.reporter.SnmpLogger; import com.passus.st.reporter.protocol.Reporter; import java.io.IOException;
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ReporterClient.java Wed Sep 20 23:11:11 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,205 +0,0 @@ -package com.passus.st.reporter.trx; - -import com.passus.commons.metric.Metric; -import com.passus.commons.metric.MetricsCollection; -import com.passus.commons.service.Service; -import com.passus.st.reporter.MetricConverter; -import com.passus.st.reporter.protocol.MetricRecord; -import com.passus.st.reporter.protocol.MetricsCollectionRecord; -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import org.apache.avro.AvroRemoteException; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -/** - * - * @author mikolaj.podbielski - */ -public class ReporterClient implements Service { - - private static final Logger LOGGER = LogManager.getLogger(ReporterClient.class); - - private final MetricConverter converter = new MetricConverter(); - private final InetSocketAddress serverAddress; - private final BlockingQueue<Object> queue; - private final boolean dropIfQueueFull; - private volatile boolean working; -// private SenderThread sender; - private SenderThread[] senders = new SenderThread[4]; - - public ReporterClient(InetSocketAddress serverAddress) { - this.serverAddress = serverAddress; - this.queue = new ArrayBlockingQueue<>(4096); - this.dropIfQueueFull = true; - } - - public ReporterClient(InetSocketAddress serverAddress, BlockingQueue<Object> queue, boolean dropIfQueueFull) { - this.serverAddress = serverAddress; - this.queue = queue; - this.dropIfQueueFull = dropIfQueueFull; - } - - @Override - public boolean isStarted() { - return working; - } - - @Override - public void start() { - if (working) { - return; - } - - working = true; -// sender = new SenderThread(); -// try { -// sender.connect(); -// } catch (IOException ignore) { -// } -// sender.start(); - for (int i = 0; i < senders.length; i++) { - senders[i] = new SenderThread(); - try { - senders[i].connect(); - } catch (IOException ignore) { - } - senders[i].start(); - } - - } - - @Override - public void stop() { - if (working) { - working = false; -// sender.interrupt(); -// try { -// sender.join(500); -// } catch (Exception ex) { -// } -// sender = null; - for (int i = 0; i < senders.length; i++) { - senders[i].interrupt(); - try { - senders[i].join(500); - } catch (Exception ex) { - } - senders[i] = null; - } - } - } - - public void waitForEmptyQueue() throws InterruptedException { - while (queue.isEmpty() == false) { - Thread.sleep(100); - } - } - - public boolean send(Object object) { - if (dropIfQueueFull) { - if (!queue.offer(object)) { - LOGGER.debug("Could not enqueue message."); - return false; - } - } else { - try { - queue.put(object); - } catch (InterruptedException ignore) { - return false; - } - } - return true; - } - - private class SenderThread extends Thread { - - private final ReporterEncoder encoder = new ReporterEncoder(); - private Socket socket; // only synchronized access - private volatile DataOutputStream os; - - private synchronized void connect() throws IOException { - try { - socket = new Socket(serverAddress.getAddress(), serverAddress.getPort()); - os = new DataOutputStream(socket.getOutputStream()); - } catch (IOException ex) { - LOGGER.warn("Could not connect.", ex); - } - } - - private synchronized void disconnect() { - try { - if (socket != null) { - socket.close(); - socket = null; - os = null; - } - } catch (IOException ignore) { - } - } - - private boolean isConnected() { - return os != null; - } - - @Override - public void run() { - while (working) { - try { - // TODO: refactor - Object metric = queue.take(); - int code; - byte[] bytes; - if (metric instanceof Metric) { - MetricRecord record = converter.convert((Metric) metric, false); - code = ReporterDispatcher.TYPE_METRIC; - bytes = encoder.encode(record); - } else if (metric instanceof MetricsCollection) { - MetricsCollectionRecord record = converter.convert((MetricsCollection) metric); - code = ReporterDispatcher.TYPE_METRICS_COLLECTION; - bytes = encoder.encode(record); - } else { - continue; - } - - if (!isConnected()) { - connect(); - } - if (isConnected()) { -// CharSequence result; -// if (record instanceof MetricRecord) { -// result = proxy.handleMetric((MetricRecord) record); -// } else if (record instanceof MetricsCollectionRecord) { -// result = proxy.handleMetricsCollection((MetricsCollectionRecord) record); -// } else { -// continue; -// } -// LOGGER.trace("result: {}", result); - synchronized (this) { - os.writeInt(bytes.length + 4); - os.writeInt(code); - os.write(bytes); - } - } - } catch (InterruptedException ex) { - LOGGER.trace("Queue.take() was interrupted."); - } catch (AvroRemoteException ex) { - LOGGER.warn("Could not send.", ex); - } catch (IOException ex) { - LOGGER.warn("Could not connect.", ex); - } catch (Exception ex) { - LOGGER.error(ex); - } - - } - - disconnect(); - LOGGER.debug("Sender stopped."); - } - - } -}
--- a/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ServerMain.java Wed Sep 20 23:11:11 2017 +0200 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/ServerMain.java Thu Sep 21 11:56:09 2017 +0200 @@ -1,7 +1,7 @@ package com.passus.st.reporter.trx; import com.passus.st.reporter.protocol.Reporter; -import com.passus.st.reporter.server.ReporterImpl; +import com.passus.st.reporter.ReporterImpl; import com.passus.st.utils.CliUtils; import static com.passus.st.utils.CliUtils.option; import io.netty.bootstrap.ServerBootstrap; @@ -31,7 +31,7 @@ */ public class ServerMain { - public static final int PORT = 400; + public static final int PORT = 11111; public static final boolean EPOLL = false; private final String bindHost;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester-reporter/src/main/java/com/passus/st/reporter/trx/SocketReporterClient.java Thu Sep 21 11:56:09 2017 +0200 @@ -0,0 +1,209 @@ +package com.passus.st.reporter.trx; + +import com.passus.commons.metric.Metric; +import com.passus.commons.metric.MetricsCollection; +import com.passus.st.reporter.MetricConverter; +import com.passus.st.reporter.ReporterClient; +import com.passus.st.reporter.protocol.MetricRecord; +import com.passus.st.reporter.protocol.MetricsCollectionRecord; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.apache.avro.AvroRemoteException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * + * @author mikolaj.podbielski + */ +public class SocketReporterClient implements ReporterClient { + + private static final Logger LOGGER = LogManager.getLogger(SocketReporterClient.class); + + private final MetricConverter converter = new MetricConverter(); + private final InetSocketAddress serverAddress; + private final BlockingQueue<Object> queue; + private final boolean dropIfQueueFull; + private volatile boolean working; +// private SenderThread sender; + private SenderThread[] senders = new SenderThread[4]; + + public SocketReporterClient(InetSocketAddress serverAddress) { + this.serverAddress = serverAddress; + this.queue = new ArrayBlockingQueue<>(4096); + this.dropIfQueueFull = true; + } + + public SocketReporterClient(InetSocketAddress serverAddress, BlockingQueue<Object> queue, boolean dropIfQueueFull) { + this.serverAddress = serverAddress; + this.queue = queue; + this.dropIfQueueFull = dropIfQueueFull; + } + + @Override + public boolean isStarted() { + return working; + } + + @Override + public void start() { + if (working) { + return; + } + + working = true; +// sender = new SenderThread(); +// try { +// sender.connect(); +// } catch (IOException ignore) { +// } +// sender.start(); + for (int i = 0; i < senders.length; i++) { + senders[i] = new SenderThread(); + try { + senders[i].connect(); + } catch (IOException ignore) { + } + senders[i].start(); + } + + } + + @Override + public void stop() { + if (working) { + working = false; +// sender.interrupt(); +// try { +// sender.join(500); +// } catch (Exception ex) { +// } +// sender = null; + for (int i = 0; i < senders.length; i++) { + senders[i].interrupt(); + try { + senders[i].join(500); + } catch (Exception ex) { + } + senders[i] = null; + } + } + } + + @Override + public void waitForEmptyQueue() throws InterruptedException { + while (queue.isEmpty() == false) { + Thread.sleep(100); + } + } + + @Override + public boolean send(Object object) { + if (dropIfQueueFull) { + if (!queue.offer(object)) { + System.out.println("reporter queue full"); + LOGGER.debug("Could not enqueue message."); + return false; + } + } else { + try { + queue.put(object); + } catch (InterruptedException ignore) { + return false; + } + } + return true; + } + + private class SenderThread extends Thread { + + private final ReporterEncoder encoder = new ReporterEncoder(); + private Socket socket; // only synchronized access + private volatile DataOutputStream os; + + private synchronized void connect() throws IOException { + try { + socket = new Socket(serverAddress.getAddress(), serverAddress.getPort()); + os = new DataOutputStream(socket.getOutputStream()); + } catch (IOException ex) { + LOGGER.warn("Could not connect.", ex); + } + } + + private synchronized void disconnect() { + try { + if (socket != null) { + socket.close(); + socket = null; + os = null; + } + } catch (IOException ignore) { + } + } + + private boolean isConnected() { + return os != null; + } + + @Override + public void run() { + while (working) { + try { + // TODO: refactor + Object metric = queue.take(); + int code; + byte[] bytes; + if (metric instanceof Metric) { + MetricRecord record = converter.convert((Metric) metric, false); + code = ReporterDispatcher.TYPE_METRIC; + bytes = encoder.encode(record); + } else if (metric instanceof MetricsCollection) { + MetricsCollectionRecord record = converter.convert((MetricsCollection) metric); + code = ReporterDispatcher.TYPE_METRICS_COLLECTION; + bytes = encoder.encode(record); + } else { + System.out.println(metric.getClass()); + continue; + } + + if (!isConnected()) { + connect(); + } + if (isConnected()) { +// CharSequence result; +// if (record instanceof MetricRecord) { +// result = proxy.handleMetric((MetricRecord) record); +// } else if (record instanceof MetricsCollectionRecord) { +// result = proxy.handleMetricsCollection((MetricsCollectionRecord) record); +// } else { +// continue; +// } +// LOGGER.trace("result: {}", result); + synchronized (this) { + os.writeInt(bytes.length + 4); + os.writeInt(code); + os.write(bytes); + } + } + } catch (InterruptedException ex) { + LOGGER.trace("Queue.take() was interrupted."); + } catch (AvroRemoteException ex) { + LOGGER.warn("Could not send.", ex); + } catch (IOException ex) { + LOGGER.warn("Could not connect.", ex); + } catch (Exception ex) { + LOGGER.error(ex); + } + + } + + disconnect(); + LOGGER.debug("Sender stopped."); + } + + } +}
--- a/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/Main.java Wed Sep 20 23:11:11 2017 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,51 +0,0 @@ -package com.passus.st.reporter.server; - -import com.passus.st.reporter.protocol.MetricRecord; -import com.passus.st.reporter.trx.ReporterClient; -import com.passus.st.reporter.trx.ReporterDispatcher; -import com.passus.st.reporter.trx.ReporterEncoder; -import java.io.DataOutputStream; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.HashMap; -import java.util.Map; - -/** - * - * @author mikolaj.podbielski - */ -public class Main { - - public static void main(String[] args) throws Exception { - Map<CharSequence, Object> fields = new HashMap<>(); - fields.put("url", "wp.pl/xyz"); - fields.put("ip", "127.0.0.1"); - fields.put("port", 12345); - fields.put("timestamp", 1234567890L); - MetricRecord metricRecord = new MetricRecord("httpRequestResponse", fields); - - ReporterEncoder reporterEncoder = new ReporterEncoder(); - byte[] bytes = reporterEncoder.encode(metricRecord); - int code = ReporterDispatcher.TYPE_METRIC; - int length = bytes.length + 4; - - int count = 6; - try (Socket s = new Socket("localhost", 400); - DataOutputStream os = new DataOutputStream(s.getOutputStream());) { - - for (int i = 0; i < count; i++) { - os.writeInt(length); - os.writeInt(code); - os.write(bytes); - } - } - - - ReporterClient rc = new ReporterClient(new InetSocketAddress("localhost", 400)); - rc.start(); - rc.send(new TestMetric()); - rc.waitForEmptyQueue(); - rc.stop(); - } - -}
--- a/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/Test.java Wed Sep 20 23:11:11 2017 +0200 +++ b/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/Test.java Thu Sep 21 11:56:09 2017 +0200 @@ -2,6 +2,7 @@ import static com.passus.commons.collection.FluentBuilder.*; import com.passus.data.ByteString; +import com.passus.st.reporter.ReporterImpl; import com.passus.st.reporter.protocol.MetricRecord; import com.passus.st.reporter.protocol.Reporter; import java.io.IOException;
--- a/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestCustom.java Wed Sep 20 23:11:11 2017 +0200 +++ b/stress-tester-reporter/src/test/java/com/passus/st/reporter/server/TestCustom.java Thu Sep 21 11:56:09 2017 +0200 @@ -1,13 +1,15 @@ package com.passus.st.reporter.server; -import com.passus.commons.metric.Metric; +import com.passus.st.reporter.ReporterImpl; +import com.passus.st.reporter.protocol.MetricRecord; +import com.passus.st.reporter.trx.ReporterDispatcher; +import com.passus.st.reporter.trx.ReporterEncoder; import com.passus.st.reporter.trx.ServerMain; -import com.passus.st.reporter.trx.ReporterClient; -import java.io.IOException; -import java.io.Serializable; +import com.passus.st.reporter.trx.SocketReporterClient; +import java.io.DataOutputStream; +import java.net.Socket; import java.util.HashMap; import java.util.Map; -import java.util.Set; /** * @@ -17,20 +19,48 @@ static final TestMetric METRIC = new TestMetric(); - public static void main(String[] args) throws IOException, InterruptedException { + public static void main(String[] args) throws Exception { ReporterImpl reporter = new ReporterImpl(); reporter.setVerbose(true); ServerMain server = new ServerMain("localhost", ServerMain.PORT, 100, reporter); server.start(); System.out.println("server started"); - ReporterClient client = new ReporterClient(Test.ADDRESS); - client.start(); - client.send(METRIC); - client.waitForEmptyQueue(); - client.stop(); + client(); server.stop(); } + static void manual() throws Exception { + Map<CharSequence, Object> fields = new HashMap<>(); + fields.put("url", "wp.pl/xyz"); + fields.put("ip", "127.0.0.1"); + fields.put("port", 12345); + fields.put("timestamp", 1234567890L); + MetricRecord metricRecord = new MetricRecord("httpRequestResponse", fields); + + ReporterEncoder reporterEncoder = new ReporterEncoder(); + byte[] bytes = reporterEncoder.encode(metricRecord); + int code = ReporterDispatcher.TYPE_METRIC; + int length = bytes.length + 4; + + int count = 6; + try (Socket s = new Socket("localhost", 11111); + DataOutputStream os = new DataOutputStream(s.getOutputStream());) { + + for (int i = 0; i < count; i++) { + os.writeInt(length); + os.writeInt(code); + os.write(bytes); + } + } + } + + static void client() throws Exception { + SocketReporterClient rc = new SocketReporterClient(Test.ADDRESS); + rc.start(); + rc.send(METRIC); + rc.waitForEmptyQueue(); + rc.stop(); + } }
--- a/stress-tester/src/main/java/com/passus/st/Main.java Wed Sep 20 23:11:11 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/Main.java Thu Sep 21 11:56:09 2017 +0200 @@ -24,7 +24,9 @@ import com.passus.st.metric.FileMetricsCollectionAppender; import com.passus.st.metric.ScheduledMetricsCollector; import com.passus.st.metric.SummrizeMetricsCollectionHandler; -import com.passus.st.reporter.server.ReporterClient; +import com.passus.st.reporter.ReporterClient; +import com.passus.st.reporter.server.AvroRpcReporterClient; +import com.passus.st.reporter.trx.SocketReporterClient; import com.passus.st.source.PcapSessionEventSource; import static com.passus.st.utils.CliUtils.option; import com.passus.st.utils.PeriodFormatter; @@ -156,6 +158,10 @@ .build() ); + options.addOption(option("nrp", "newReporterProto").desc("Enables new reporter protocol.") + .hasArg(false) + .build()); + options.addOption(option("wf", "writeFile").desc("Write result to file.") .hasArg().argName("file").optionalArg(true) .build() @@ -302,7 +308,12 @@ if (cl.hasOption("ri")) { int port = 11111; InetAddress addr = InetAddress.getByName(cl.getOptionValue("ri")); - reporterClient = new ReporterClient(new InetSocketAddress(addr, port)); + InetSocketAddress socketAddr = new InetSocketAddress(addr, port); + if (cl.hasOption("nrp")) { + reporterClient = new SocketReporterClient(socketAddr); + } else { + reporterClient = new AvroRpcReporterClient(socketAddr); + } reporterClient.start(); client.addListener(new HttpReporterClientListener(reporterClient));
--- a/stress-tester/src/main/java/com/passus/st/PcapReporter.java Wed Sep 20 23:11:11 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/PcapReporter.java Thu Sep 21 11:56:09 2017 +0200 @@ -28,7 +28,7 @@ import com.passus.st.metric.FileMetricsCollectionAppender; import com.passus.st.metric.ScheduledMetricsCollector; import com.passus.st.metric.SummrizeMetricsCollectionHandler; -import com.passus.st.reporter.server.ReporterClient; +import com.passus.st.reporter.trx.SocketReporterClient; import com.passus.st.source.PcapSessionEventSource; import static com.passus.st.utils.CliUtils.option; import com.passus.st.utils.PeriodFormatter; @@ -43,6 +43,8 @@ import org.apache.commons.cli.ParseException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import static com.passus.st.Main.printMetrics; +import static com.passus.st.utils.CliUtils.option; /** * @@ -91,7 +93,7 @@ Log4jConfigurationFactory.enableFactory(logLevel); InetAddress addr = InetAddress.getByName(cl.getOptionValue("ri")); - ReporterClient reporterClient = new ReporterClient(new InetSocketAddress(addr, 11111)); + SocketReporterClient reporterClient = new SocketReporterClient(new InetSocketAddress(addr, 11111)); reporterClient.start(); LocalHandler lh = new LocalHandler(reporterClient, cl.hasOption("ps")); @@ -174,7 +176,7 @@ private volatile int count; private volatile boolean finished; - public LocalHandler(ReporterClient reporterClient, boolean partialSession) { + public LocalHandler(SocketReporterClient reporterClient, boolean partialSession) { reporter = new HttpReporterClientListener(reporterClient); this.partialSession = partialSession; }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpReporterClientListener.java Wed Sep 20 23:11:11 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpReporterClientListener.java Thu Sep 21 11:56:09 2017 +0200 @@ -14,7 +14,7 @@ import static com.passus.st.client.http.HttpConsts.TAG_TIME_END; import static com.passus.st.client.http.HttpConsts.TAG_TIME_START; import com.passus.st.emitter.SessionInfo; -import com.passus.st.reporter.server.ReporterClient; +import com.passus.st.reporter.ReporterClient; import java.util.HashSet; import java.util.Map; import java.util.Set;
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpReporterMetricHandler.java Wed Sep 20 23:11:11 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpReporterMetricHandler.java Thu Sep 21 11:56:09 2017 +0200 @@ -2,7 +2,7 @@ import com.passus.commons.metric.MetricsCollection; import com.passus.st.metric.MetricsCollectionHandler; -import com.passus.st.reporter.server.ReporterClient; +import com.passus.st.reporter.ReporterClient; /** *
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Wed Sep 20 23:11:11 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Thu Sep 21 11:56:09 2017 +0200 @@ -131,7 +131,7 @@ public String toString() { StringBuilder sb = new StringBuilder(); - sb.append(SessionUtils.transportToString(transport)).append(" "); +// sb.append(SessionUtils.transportToString(transport)).append(" "); sb.append(srcIp).append(":").append(srcPort); sb.append("<->"); sb.append(dstIp).append(":").append(dstPort);