Mercurial > stress-tester
changeset 628:ace9b1e69e96
NioEmitterWorker2
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/Main2.java Tue Oct 24 15:26:49 2017 +0200 @@ -0,0 +1,473 @@ +package com.passus.st; + +import com.passus.commons.ConversionException; +import com.passus.commons.metric.MapMetric; +import com.passus.commons.metric.Metric; +import com.passus.commons.service.Registry; +import com.passus.config.Configuration; +import com.passus.config.YamlConfigurationReader; +import com.passus.config.validation.Errors; +import static com.passus.config.validation.ErrorsUtils.objectErrorToString; +import com.passus.config.validation.ObjectError; +import com.passus.net.PortRangeSet; +import com.passus.st.client.MemoryEventsCache; +import com.passus.st.client.http.DumperHttpClientListener; +import com.passus.st.client.http.HttpClient; +import com.passus.st.client.http.ReporterRemoteDestination; +import com.passus.st.client.http.HttpSourceNameAwareClientWorkerDispatcher; +import com.passus.st.client.http.ReporterDestination; +import com.passus.st.client.http.ReporterFileDestination; +import com.passus.st.client.http.SummaryHttpClientListener; +import com.passus.st.client.http.WriterHttpClientListener; +import com.passus.st.client.http.filter.HttpFiltersConfigurator; +import com.passus.st.emitter.PassThroughSessionMapper; +import com.passus.st.emitter.RuleBasedSessionMapper; +import com.passus.st.emitter.SessionMapper; +import com.passus.st.emitter.nio.NioEmitter; +import com.passus.st.emitter.nio.NioEmitterWorker2; +import com.passus.st.metric.FileMetricsCollectionAppender; +import com.passus.st.metric.ScheduledMetricsCollector; +import com.passus.st.metric.SummrizeMetricsCollectionHandler; +import com.passus.st.reporter.ReporterClient; +import com.passus.st.reporter.server.AvroRpcReporterClient; +import com.passus.st.reporter.trx.SocketReporterClient; +import com.passus.st.source.PcapSessionEventSource; +import com.passus.st.utils.PeriodFormatter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.Serializable; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import static com.passus.st.utils.CliUtils.option; + +/** + * + * @author Mirosław Hawrot + */ +public class Main2 { + + private final SummrizeMetricsCollectionHandler summMetricsHandler = new SummrizeMetricsCollectionHandler(); + private long startTime; + + static void printHelp(Options options) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("[options] <pcap>", "description", options, ""); + } + + static void printError(String msg) { + System.err.print(msg); + System.exit(1); + } + + static void printMetrics(List<Metric> metrics, long startTime) { + if (startTime == 0) { + return; + } + + long endTime = System.currentTimeMillis(); + synchronized (System.out) { + System.out.println(""); + try { + System.out.println("Elapsed time: " + PeriodFormatter.INSTANCE.reverseTransform(endTime - startTime) + "."); + } catch (ConversionException conversionException) { + } + System.out.println("Metrics:"); + + String line = "%24s: %s\n"; + for (Metric metric : metrics) { + System.out.println(metric.getName() + ":"); + Map<String, Serializable> valuesMap = metric.getAttributesValue(); + for (Map.Entry<String, Serializable> entry : valuesMap.entrySet()) { + String key = entry.getKey(); + Serializable value = entry.getValue(); + + System.out.printf(line, key, value); + } + } + } + } + + void printMetrics() { + List<Metric> metrics = summMetricsHandler.getMetrics(); + printMetrics(metrics, startTime); + } + + List<Metric> getMetrics() { + return summMetricsHandler.getMetrics(); + } + + void start(String... args) { + AppUtils.registerAll(); + final Options options = new Options(); + + options.addOption(option("l", "logLevel").desc("Log level.") + .hasArg().argName("level") + .build() + ); + + options.addOption(option("ff", "filtersFile").desc("Filters file.") + .hasArg().argName("file") + .build() + ); + + options.addOption(option("mr", "mapperRule").desc("Session mapper rule.") + .hasArg().argName("rule") + .build() + ); + + options.addOption(option("ps", "allowPartialSession").desc("Allow partial sessions.") + .hasArg(false) + .build() + ); + + options.addOption(option("hp", "httpPorts").desc("Specify HTTP ports in input file (default: 80, 8080)") + .hasArg().argName("ports") + .build() + ); + + options.addOption(option("rs", "replaySpeed").desc("Speedup factor (default 0 - top speed)") + .hasArg().argName("speed") + .build()); + + options.addOption(option("pr", "parallelReplays").desc("Number of parallel replays. Works only for one pcap file.") + .hasArg().argName("replays") + .build()); + + options.addOption(option("ca", "cache").desc("Cache (and preprocess) input file.") + .hasArg(false) + .build()); + + options.addOption(option("lp", "loops").desc("Loops number (default 1).") + .hasArg().argName("loop") + .build() + ); + + options.addOption(option("wt", "workerType").desc("Worker type: synch|asynch|parallel (default synch).") + .hasArg().argName("type") + .build() + ); + + options.addOption(option("rd", "reporterDirectory").desc("Reporter directory.") + .hasArg().argName("ip") + .build() + ); + options.addOption(option("ri", "reporterIp").desc("Reporter ip address.") + .hasArg().argName("ip") + .build() + ); + options.addOption(option("nrp", "newReporterProto").desc("Enables new reporter protocol.") + .hasArg(false) + .build()); + options.addOption(option("nrt", "newReporterThreads").desc("Number of sending threads. (range 1 - 8, default 2)") + .hasArg().argName("threads") + .build() + ); + + options.addOption(option("wf", "writeFile").desc("Write result to file.") + .hasArg().argName("file").optionalArg(true) + .build() + ); + + options.addOption(option("wd", "writeDirectory").desc("Write HTTP messages to separate files.") + .hasArg().argName("directory") + .build() + ); + + options.addOption(option("wdro", "writeDirectoryRequestsOnly").desc("Write only HTTP requests.") + .hasArg(false) + .build() + ); + + options.addOption(option("wm", "writeMetrics").desc("Write metrics to file.") + .hasArg().argName("file") + .build() + ); + + try { + CommandLine cl = new DefaultParser().parse(options, args); + String[] clArgs = cl.getArgs(); + if (clArgs.length < 1) { + System.err.println("At least one pcap file required."); + printHelp(options); + return; + } + + String logLevel = "error"; + if (cl.hasOption("l")) { + logLevel = cl.getOptionValue("l"); + } + if (!logLevel.equals("default")) { + Log4jConfigurationFactory.enableFactory(logLevel); + } + + SessionMapper mapper; + if (cl.hasOption("mr")) { + RuleBasedSessionMapper constMapper = new RuleBasedSessionMapper(); + String[] rules = cl.getOptionValues("mr"); + for (String rule : rules) { + try { + constMapper.addRule(rule); + } catch (Exception e) { + printError("Invalid mapper rule '" + rule + "'."); + } + } + + mapper = constMapper; + } else { + mapper = new PassThroughSessionMapper(); + } + + NioEmitter emitter = new NioEmitter(NioEmitterWorker2.class); + emitter.setSessionMapper(mapper); + emitter.setCollectMetrics(true); + + HttpClient client = new HttpClient(emitter); + client.setCollectMetrics(true); + client.setConnectPartialSession(cl.hasOption("ps")); + client.setWokerType(cl.getOptionValue("wt", "synch")); + client.addListener((request, response, context) -> { + if (startTime == 0) { + startTime = System.currentTimeMillis(); + } + }); + if (cl.hasOption("pr")) { + if (clArgs.length != 1) { + throw new IllegalArgumentException("Parameter \"parallelReplays\" works only for one pcap file."); + } + + int parallelReplays = Integer.parseInt(cl.getOptionValue("pr")); + if (parallelReplays > 0 && parallelReplays <= 100) { + emitter.setMaxThreads(parallelReplays); + client.setWorkersNum(parallelReplays); + } else { + throw new IllegalArgumentException("Parameter \"parallelReplays\" should be in range 1-100."); + } + } else { + client.setWorkersNum(clArgs.length); + client.setDispatcher(new HttpSourceNameAwareClientWorkerDispatcher()); + } + emitter.start(); + + if (cl.hasOption("rs")) { + float speed = Float.parseFloat(cl.getOptionValue("rs")); + if (speed > .1f && speed < 100f) { + client.setSleepFactor(1f / speed); + } else if (speed != 0f) { + throw new IllegalArgumentException(); + } + } + + if (cl.hasOption("wf")) { + WriterHttpClientListener writerListener; + String value = cl.getOptionValue("wf"); + if (value != null && !value.equals("-")) { + File outFile = new File(cl.getOptionValue("wf")); + writerListener = WriterHttpClientListener.createFileListener(outFile); + } else { + writerListener = WriterHttpClientListener.createStdoutListener(); + } + writerListener.write(Arrays.asList(args).toString()); + writerListener.write("\n"); + client.addListener(writerListener); + } + + SummaryHttpClientListener summaryListener = null; + if (cl.hasOption("wd")) { + File outDir = new File(cl.getOptionValue("wd")); + DumperHttpClientListener dumper = new DumperHttpClientListener(outDir); + dumper.setRequestsOnly(cl.hasOption("wdro")); + dumper.setDecodeContent(true); + File directory = dumper.getDirectory(); + if (directory.isDirectory() || directory.mkdirs()) { + client.addListener(dumper); + summaryListener = new SummaryHttpClientListener(new File(directory, "summary.txt")); + File file = new File(directory, "cmd." + new File(clArgs[0]).getName() + ".txt"); + String cmd = String.join(" ", args); + try (FileOutputStream fos = new FileOutputStream(file);) { + IOUtils.write(cmd, fos); + } + client.addListener(summaryListener); + } else { + throw new Exception("Cannot create directory: " + directory.getAbsolutePath()); + } + } + + ScheduledMetricsCollector collector = new ScheduledMetricsCollector(); + + ReporterClient reporterClient = null; + if (cl.hasOption("ri")) { + int port = 11111; + InetAddress addr = InetAddress.getByName(cl.getOptionValue("ri")); + InetSocketAddress socketAddr = new InetSocketAddress(addr, port); + if (cl.hasOption("nrp")) { + int threads = Integer.parseInt(cl.getOptionValue("nrt", "2")); + reporterClient = new SocketReporterClient(socketAddr, threads); + } else { + reporterClient = new AvroRpcReporterClient(socketAddr); + } + reporterClient.start(); + + ReporterDestination reporterDestination = new ReporterRemoteDestination(reporterClient); + Registry.getInstance().add(ReporterDestination.SERVICE_NAME, reporterDestination); + client.addListener(reporterDestination); + collector.addHandler(reporterDestination); + } else if (cl.hasOption("rd")) { + // TODO: refactor + ReporterDestination reporterDestination = new ReporterFileDestination(cl.getOptionValue("rd")); + Registry.getInstance().add(ReporterDestination.SERVICE_NAME, reporterDestination); + client.addListener(reporterDestination); + collector.addHandler(reporterDestination); + } + + if (cl.hasOption("ff")) { + File filtersFile = new File(cl.getOptionValue("ff")); + Configuration config = YamlConfigurationReader.readFromFile(filtersFile); + HttpFiltersConfigurator configurator = new HttpFiltersConfigurator(client); + Errors errors = new Errors(); + configurator.configure(config, errors); + if (errors.getErrorCount() != 0) { + System.out.println("Error in file '" + filtersFile.getAbsolutePath() + "'."); + for (ObjectError error : errors.getAllErrors()) { + System.out.println("\t" + objectErrorToString(error)); + } + System.exit(1); + } + } + + client.start(); + + PcapSessionEventSource[] eventSrcs = new PcapSessionEventSource[clArgs.length]; + + for (int i = 0; i < clArgs.length; i++) { + PcapSessionEventSource eventSrc = new PcapSessionEventSource(); + eventSrc.setPcapFile(clArgs[i]); + eventSrc.setAllowPartialSession(cl.hasOption("ps")); + eventSrc.setCollectMetrics(true); + eventSrcs[i] = eventSrc; + } + + if (cl.hasOption("hp")) { + for (int i = 0; i < clArgs.length; i++) { + PcapSessionEventSource eventSrc = eventSrcs[i]; + PortRangeSet portsRanges = eventSrc.getPortsRange(); + portsRanges.clear(); + + String[] ports = cl.getOptionValues("hp"); + for (String port : ports) { + portsRanges.add(port); + } + } + } + + int loops = Integer.parseInt(cl.getOptionValue("lp", "1")); + if (loops <= 0) { + throw new Exception("Loop should be greater than zero."); + } + + if (cl.hasOption("wm")) { + File metricsFile = new File(cl.getOptionValue("wm")); + if (!metricsFile.exists() && !metricsFile.createNewFile()) { + printError("Unable to create metrics file '" + metricsFile + "'."); + } + + FileMetricsCollectionAppender fileAppender = new FileMetricsCollectionAppender(); + fileAppender.setFileName(metricsFile.getAbsolutePath()); + fileAppender.start(); + + collector.addHandler(fileAppender); + } + + collector.addHandler(summMetricsHandler); + for (int i = 0; i < clArgs.length; i++) { + collector.register(eventSrcs[i]); + } + + collector.register(emitter); + collector.register(client); + collector.start(); + + if (cl.hasOption("ca")) { + Set<String> sourcesName = new HashSet<>(); + for (int i = 0; i < clArgs.length; i++) { + PcapSessionEventSource eventSrc = eventSrcs[i]; + sourcesName.add(eventSrc.getName()); + } + + MemoryEventsCache cache = new MemoryEventsCache(client, sourcesName); + for (int i = 0; i < clArgs.length; i++) { + PcapSessionEventSource eventSrc = eventSrcs[i]; + eventSrc.setHandler(cache); + eventSrc.start(); + } + cache.setLoop(loops); + cache.await(); + cache.send(); + } else { + for (int i = 0; i < clArgs.length; i++) { + PcapSessionEventSource eventSrc = eventSrcs[i]; + eventSrc.setHandler(client); + eventSrc.setLoops(loops); + eventSrc.start(); + } + } + + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleAtFixedRate(this::printMetrics, 5, 5, TimeUnit.SECONDS); + + client.join(); + + long endTime = System.currentTimeMillis(); + System.out.println("Broadcast finished. Duration " + PeriodFormatter.INSTANCE.reverseTransform(endTime - startTime) + "."); + + collector.collect(); + + executor.shutdownNow(); + for (int i = 0; i < clArgs.length; i++) { + eventSrcs[i].stop(); + } + + client.stop(); + emitter.stop(); + if (reporterClient != null) { + reporterClient.send(new MapMetric("endOfReplay", Collections.emptyMap())); + reporterClient.waitForEmptyQueue(); + reporterClient.stop(); + System.out.println("Dropped reporter messages: " + reporterClient.getDroppedMessages()); + } + if (summaryListener != null) { + summaryListener.close(); + } + + collector.flush(true); + + printMetrics(); + } catch (ParseException e) { + System.out.println(e.getMessage()); + printHelp(options); + } catch (Exception e) { + e.printStackTrace(System.err); + } finally { + AppUtils.unregisterAll(); + } + } + + public static void main(String... args) { + new Main2().start(args); + } + +}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java Tue Oct 17 13:10:50 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java Tue Oct 24 15:26:49 2017 +0200 @@ -28,7 +28,7 @@ public static final String TYPE = "parallel"; - public static final int DEFAULT_MAX_SENT_REQUESTS = 1024; + public static final int DEFAULT_MAX_SENT_REQUESTS = 10; private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>(); @@ -193,6 +193,32 @@ makeFirst(flowContext); } + @Override + public void handle(Event event) { + Event newEvent = null; + switch (event.getType()) { + case HttpSessionPayloadEvent.TYPE: + semaphore.acquireUninterruptibly(); + newEvent = eventInstanceForWorker(event); + break; + case SessionStatusEvent.TYPE: + case DataLoopEnd.TYPE: + case DataEnd.TYPE: + newEvent = event; + } + + if (newEvent != null) { + synchronized (lock) { + try { + eventsQueue.put(newEvent); + lock.notifyAll(); + } catch (Exception e) { + logger.debug("Unable to add event to queue. " + e.getMessage(), e); + } + } + } + } + private void processEvent(Event event) { if (event instanceof SessionEvent) { switch (event.getType()) { @@ -258,32 +284,6 @@ } @Override - public void handle(Event event) { - Event newEvent = null; - switch (event.getType()) { - case HttpSessionPayloadEvent.TYPE: - semaphore.acquireUninterruptibly(); - newEvent = eventInstanceForWorker(event); - break; - case SessionStatusEvent.TYPE: - case DataLoopEnd.TYPE: - case DataEnd.TYPE: - newEvent = event; - } - - if (newEvent != null) { - synchronized (lock) { - try { - eventsQueue.put(newEvent); - lock.notifyAll(); - } catch (Exception e) { - logger.debug("Unable to add event to queue. " + e.getMessage(), e); - } - } - } - } - - @Override public void run() { synchronized (lock) { working = true;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpStresserWorker.java Tue Oct 24 15:26:49 2017 +0200 @@ -0,0 +1,73 @@ +package com.passus.st.client.http; + +import com.passus.net.http.HttpHeaders; +import com.passus.net.http.HttpRequest; +import com.passus.st.client.DataEvents; +import com.passus.st.client.Event; +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.SessionInfo; + +/** + * + * @author Mirosław Hawrot + */ +public class HttpStresserWorker extends HttpClientWorker { + + private int maxRequests = -1; + + private int sentRequests = 0; + + private boolean working = true; + + public HttpStresserWorker(Emitter emitter, String name, int index) { + super(emitter, name, index); + } + + @Override + public boolean isWorking() { + return working; + } + + @Override + public int activeConnections() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void close() { + + } + + private void send(HttpRequest req) { + req.getHeaders().delete(HttpHeaders.KEEP_ALIVE); + + } + + @Override + public void close(SessionInfo session) { + throw new UnsupportedOperationException("Not supported yet."); + } + + private void waitAndClose() { + working = false; + } + + @Override + public void handle(Event event) { + if (event.getType() == DataEvents.DataLoopEnd.TYPE) { + waitAndClose(); + } else if (event.getType() != HttpSessionPayloadEvent.TYPE) { + return; + } + + HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event; + HttpRequest req = payloadEvent.getRequest(); + send(req); + } + + @Override + public void run() { + + } + +}
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Tue Oct 17 13:10:50 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Tue Oct 24 15:26:49 2017 +0200 @@ -4,7 +4,6 @@ import com.passus.net.IpAddress; import com.passus.net.SocketAddress; import com.passus.net.session.SessionBean; -import com.passus.net.session.SessionUtils; import java.text.ParseException; import java.util.Objects; import org.apache.commons.lang3.StringUtils;
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java Tue Oct 17 13:10:50 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java Tue Oct 24 15:26:49 2017 +0200 @@ -18,7 +18,7 @@ */ public class NioChannelContext implements ChannelContext { - private final NioEmitterWorker worker; + private final NioDefaultEmitterWorker worker; private final SessionInfo sessionInfo; @@ -32,7 +32,7 @@ private SelectionKey key; - public NioChannelContext(NioEmitterWorker worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) { + public NioChannelContext(NioDefaultEmitterWorker worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) { this.worker = worker; this.channel = channel; this.remoteAddress = remoteAddress;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java Tue Oct 24 15:26:49 2017 +0200 @@ -0,0 +1,113 @@ +package com.passus.st.emitter.nio; + +import com.passus.data.ByteBuff; +import com.passus.net.SocketAddress; +import com.passus.net.utils.AddressUtils; +import com.passus.st.emitter.ChannelContext; +import com.passus.st.emitter.SessionInfo; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.LinkedList; +import java.util.Queue; + +/** + * + * @author Mirosław Hawrot + */ +public class NioChannelContext2 implements ChannelContext { + + private final NioEmitterWorker2 worker; + + private final SessionInfo sessionInfo; + + private final SocketChannel channel; + + private final Queue<ByteBuffer> dataQueue; + + private SocketAddress localAddress; + + private SocketAddress remoteAddress; + + private SelectionKey key; + + /** + * Usunac + */ + public long regTime; + + public NioChannelContext2(NioEmitterWorker2 worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) { + this.worker = worker; + this.channel = channel; + this.remoteAddress = remoteAddress; + this.sessionInfo = sessionInfo; + this.dataQueue = new LinkedList<>(); + + } + + Queue<ByteBuffer> dataQueue() { + return dataQueue; + } + + void selectionKey(SelectionKey key) { + this.key = key; + } + + private void addToQeueu(ByteBuffer buffer) throws IOException { + dataQueue.add(buffer); + } + + @Override + public boolean isConnected() { + return channel.isConnected(); + } + + @Override + public boolean isConnectionPending() { + return channel.isConnectionPending(); + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + addToQeueu(ByteBuffer.wrap(data, offset, length)); + } + + @Override + public void write(ByteBuff data) throws IOException { + addToQeueu(data.toNioByteBuffer()); + } + + @Override + public void flush() { + worker.flush(key); + } + + @Override + public void close() throws IOException { + worker.doClose(key); + } + + @Override + public SocketAddress getLocalAddress() { + try { + if (localAddress == null && channel.getLocalAddress() != null) { + localAddress = AddressUtils.jdkSocketToSocketAddress(channel.getLocalAddress()); + } + } catch (Exception e) { + } + + return localAddress; + } + + @Override + public SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public SessionInfo getSessionInfo() { + return sessionInfo; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDefaultEmitterWorker.java Tue Oct 24 15:26:49 2017 +0200 @@ -0,0 +1,518 @@ +package com.passus.st.emitter.nio; + +import com.passus.data.ByteBuff; +import com.passus.data.HeapByteBuff; +import com.passus.net.SocketAddress; +import com.passus.net.utils.AddressUtils; +import com.passus.st.emitter.EmitterHandler; +import static com.passus.st.emitter.SessionMapper.ANY_SOCKET; +import com.passus.st.emitter.SessionMapper.ConnectionParams; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.metric.MetricsContainer; +import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID; +import java.io.IOException; +import java.net.ConnectException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * + * @author Mirosław Hawrot + */ +public class NioDefaultEmitterWorker extends NioEmitterWorker { + + private final Selector selector; + + private volatile boolean working = false; + + private final Queue<Task> tasks = new ConcurrentLinkedQueue<>(); + + public NioDefaultEmitterWorker(int index) throws IOException { + super(index); + selector = Selector.open(); + } + + @Override + public void setWorking(boolean working) { + this.working = working; + if (this.working && !working) { + try { + selector.close(); + } catch (IOException ex) { + logger.warn(ex.getMessage(), ex); + } + } + } + + public boolean isWorking() { + return working; + } + + @Override + public void writeMetrics(MetricsContainer container) { + if (collectMetrics) { + container.update(System.currentTimeMillis(), metric); + metric.reset(); + } + } + + @Override + public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException { + tasks.add(new ConnectTask(sessionInfo, handler)); + selector.wakeup(); + } + + void flush(SelectionKey key) { + tasks.add(new FlushTask(key)); + selector.wakeup(); + } + + private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) { + try { + ConnectionParams connParams = sessionMapper.map(sessionInfo); + if (connParams == null) { + if (logger.isDebugEnabled()) { + logger.debug("Unable to map session '{}'.", sessionInfo); + } + + if (collectMetrics) { + synchronized (metric) { + metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID); + } + } + + try { + handler.sessionInvalidated(sessionInfo); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); + } + + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); + + SocketAddress bindAddress = connParams.getBindAddress(); + if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) { + channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress)); + } + + SocketAddress remoteAddress = connParams.getRemoteAddress(); + if (remoteAddress == null) { + remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); + } + + NioChannelContext channelContext = new NioChannelContext(this, channel, remoteAddress, sessionInfo); + KeyContext keyContext = new KeyContext(channelContext, handler); + SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext); + try { + handler.channelRegistered(channelContext); + } catch (Exception ex) { + doCatchException(key, ex); + } + + channelContext.selectionKey(key); + try { + channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress)); + } catch (Exception ex) { + doCatchException(key, ex); + return; + } + + selector.wakeup(); + } catch (Exception e) { + if (collectMetrics) { + metric.errorCaught(e); + } + logger.error(e.getMessage(), e); + } + + } + + private void doFinishConnect(SelectionKey key) { + SocketChannel channel = (SocketChannel) key.channel(); + KeyContext keyContext = (KeyContext) key.attachment(); + + if (logger.isDebugEnabled()) { + logger.debug("Connecting to '" + keyContext.channelContext.getRemoteAddress() + "'."); + } + + try { + long connStart = System.currentTimeMillis(); + boolean timeouted = false; + while (!channel.finishConnect()) { + long now = System.currentTimeMillis(); + if (now - connStart < connectionTimeout) { + timeouted = true; + break; + } + } + key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); + + if (timeouted) { + if (logger.isDebugEnabled()) { + logger.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress()); + } + + throw new ConnectException("Connection timed out."); + + } + } catch (Exception e) { + doCatchException(key, e); + key.cancel(); + if (collectMetrics) { + metric.incConnectionsErrors(); + } + + return; + } + + try { + if (collectMetrics) { + metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress()); + metric.addBindSocket(keyContext.channelContext.getLocalAddress()); + } + + if (logger.isDebugEnabled()) { + logger.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress()); + } + + keyContext.handler.channelActive(keyContext.channelContext); + setOpRead(key); + } catch (Exception ex) { + logger.error(ex.getMessage(), ex); + } + } + + private void doWrite(SelectionKey key) { + SocketChannel socketChannel = (SocketChannel) key.channel(); + KeyContext keyContext = (KeyContext) key.attachment(); + if (logger.isDebugEnabled()) { + logger.debug("Writing ({} -> {}).", + keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); + } + + keyContext.handler.dataWriteStart(keyContext.channelContext); + Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue(); + int written = 0; + try { + ByteBuffer buffer; + while (!queue.isEmpty()) { + buffer = queue.poll(); + while (buffer.hasRemaining()) { + int res = socketChannel.write(buffer); + + if (res == -1) { + doClose(key); + return; + } + + if (collectMetrics) { + metric.updateSentBytes(res); + } + + written += res; + } + } + } catch (Exception e) { + doCatchException(key, e); + doClose(key); + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Written {}B ({} -> {})", written, + keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); + } + + //TODO Operacje na handlerach powinny przechodzic przez Executor + try { + keyContext.handler.dataWritten(keyContext.channelContext); + logger.debug("Write handled."); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + setOpRead(key); + clearOpWrite(key); + } + + private void doRead(SelectionKey key) { + SocketChannel channel = (SocketChannel) key.channel(); + KeyContext keyContext = (KeyContext) key.attachment(); + + if (logger.isDebugEnabled()) { + logger.debug("Reading ({} -> {})", + keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); + } + + ByteBuffer buffer = keyContext.buffer; + + buffer.clear(); + + ByteBuff buff = new HeapByteBuff(); + int totalReaded = 0; + int readed; + try { + while ((readed = channel.read(buffer)) > 0) { + buffer.flip(); + buff.append(buffer.array(), buffer.position(), buffer.limit()); + buffer.clear(); + totalReaded += readed; + + if (collectMetrics) { + metric.updateReceivedBytes(readed); + } + } + } catch (IOException e) { + doCatchException(key, e); + doClose(key); + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Readed {}B ({} -> {})", totalReaded, + keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); + } + + if (totalReaded > 0) { + try { + keyContext.handler.dataReceived(keyContext.channelContext, buff); + logger.debug("Read handled."); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + } + + if (readed == -1) { + doClose(key); + return; + } + + keyContext.buffer.flip(); + } + + private void doCatchException(SelectionKey key, Throwable cause) { + if (logger.isDebugEnabled()) { + logger.debug("Error occured. " + cause.getMessage(), cause); + } + + if (collectMetrics) { + synchronized (metric) { + metric.errorCaught(cause); + } + } + + KeyContext keyContext = (KeyContext) key.attachment(); + + try { + keyContext.handler.errorOccured(keyContext.channelContext, cause); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + } + + private void doClose(SelectionKey key) { + if (!key.channel().isOpen()) { + selector.wakeup(); + return; + } + + try { + key.channel().close(); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + KeyContext keyContext = (KeyContext) key.attachment(); + if (logger.isDebugEnabled()) { + logger.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'."); + } + + try { + keyContext.handler.channelInactive(keyContext.channelContext); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + key.cancel(); + try { + keyContext.handler.channelUnregistered(keyContext.channelContext); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + if (logger.isDebugEnabled()) { + logger.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'."); + } + + if (collectMetrics) { + metric.incClosedConnections(); + } + + selector.wakeup(); + } + + static void setOpRead(SelectionKey key) { + if (!key.isValid() || key.isReadable()) { + return; + } + + key.interestOps(key.interestOps() | SelectionKey.OP_READ); + key.selector().wakeup(); + } + + static void clearOpRead(SelectionKey key) { + if (!key.isValid() || !key.isReadable()) { + return; + } + + key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); + key.selector().wakeup(); + } + + static void setOpWrite(SelectionKey key) { + if (!key.isValid() || key.isWritable()) { + return; + } + + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + key.selector().wakeup(); + } + + static void clearOpWrite(SelectionKey key) { + if (!key.isValid() || !key.isWritable()) { + return; + } + + key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); + key.selector().wakeup(); + } + + void requestClose(SelectionKey key) { + tasks.add(new CloseTask(key)); + key.selector().wakeup(); + } + + @Override + public void run() { + int selected = 0; + working = true; + while (working) { + if (!tasks.isEmpty()) { + Task task; + while ((task = tasks.poll()) != null) { + if (task.code == Task.CLOSE) { + doClose(((CloseTask) task).key); + } else if (task.code == Task.CONNECT) { + ConnectTask taskConn = (ConnectTask) task; + doConnect(taskConn.sessionInfo, taskConn.handler); + } else if (task.code == Task.FLUSH) { + FlushTask flushTask = (FlushTask) task; + setOpWrite(flushTask.key); + } + } + } + + try { + selected = selector.select(selectTimeout); + } catch (IOException ex) { + logger.warn(ex.getMessage(), ex); + } + + if (selected > 0) { + Iterator<SelectionKey> it = selector.selectedKeys().iterator(); + while (it.hasNext()) { + SelectionKey key = it.next(); + it.remove(); + + if (!key.isValid()) { + continue; + } + + if (key.isConnectable()) { + doFinishConnect(key); + } else if (key.isWritable()) { + doWrite(key); + } else if (key.isReadable()) { + doRead(key); + } + } + } + } + } + + private static class KeyContext { + + private final EmitterHandler handler; + + private final NioChannelContext channelContext; + + private ByteBuffer buffer = ByteBuffer.allocate(1024); + + public KeyContext(NioChannelContext channelContext, EmitterHandler handler) { + this.channelContext = channelContext; + this.handler = handler; + } + + } + + private static abstract class Task { + + public static final int CLOSE = 1; + public static final int CONNECT = 2; + public static final int FLUSH = 3; + + private final int code; + + public Task(int code) { + this.code = code; + } + + } + + private final static class ConnectTask extends Task { + + private final SessionInfo sessionInfo; + private final EmitterHandler handler; + + public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) { + super(CONNECT); + this.sessionInfo = sessionInfo; + this.handler = handler; + } + + } + + private final static class CloseTask extends Task { + + private final SelectionKey key; + + public CloseTask(SelectionKey key) { + super(CLOSE); + this.key = key; + } + + } + + private final static class FlushTask extends Task { + + private final SelectionKey key; + + public FlushTask(SelectionKey key) { + super(FLUSH); + this.key = key; + } + + } +}
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java Tue Oct 17 13:10:50 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java Tue Oct 24 15:26:49 2017 +0200 @@ -35,6 +35,17 @@ private long connectionTimeout = 5_000; + private final Class<? extends NioEmitterWorker> workerClass; + + public NioEmitter() { + this(NioDefaultEmitterWorker.class); + } + + public NioEmitter(Class<? extends NioEmitterWorker> workerClass) { + Assert.notNull(workerClass, "workerClass"); + this.workerClass = workerClass; + } + public int getMaxThreads() { return maxThreads; } @@ -123,7 +134,7 @@ workers = new NioEmitterWorker[maxThreads]; for (int i = 0; i < maxThreads; i++) { try { - NioEmitterWorker worker = new NioEmitterWorker(i); + NioEmitterWorker worker = workerClass.getConstructor(Integer.TYPE).newInstance(i); worker.setSessionMapper(sessionMapper); worker.setCollectMetrics(collectMetrics); worker.setConnectionTimeout(connectionTimeout);
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java Tue Oct 17 13:10:50 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java Tue Oct 24 15:26:49 2017 +0200 @@ -1,596 +1,102 @@ -package com.passus.st.emitter.nio; - -import com.passus.st.emitter.SessionMapper; -import com.passus.commons.Assert; -import com.passus.data.ByteBuff; -import com.passus.data.HeapByteBuff; -import com.passus.net.SocketAddress; -import com.passus.net.utils.AddressUtils; -import com.passus.st.emitter.EmitterHandler; -import com.passus.st.emitter.EmitterMetric; -import static com.passus.st.emitter.SessionMapper.ANY_SOCKET; -import com.passus.st.emitter.SessionMapper.ConnectionParams; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.metric.MetricSource; -import com.passus.st.metric.MetricsContainer; -import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID; -import java.io.IOException; -import java.net.ConnectException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -/** - * - * @author Mirosław Hawrot - */ -public class NioEmitterWorker extends Thread implements MetricSource { - - private static final Logger LOGGER = LogManager.getLogger(NioEmitterWorker.class); - - private int index; - - private final Selector selector; - - private volatile boolean working = false; - - private long selectTimeout = 100; - - private Queue<Task> tasks = new ConcurrentLinkedQueue<>(); - - private SessionMapper sessionMapper; - - private volatile boolean collectMetrics = false; - - private long connectionTimeout = 5_000; - - private EmitterMetric metric; - - NioEmitterWorker(int index) throws IOException { - super("NioEmitterWorker-" + index); - selector = Selector.open(); - } - - int getIndex() { - return index; - } - - public long getSelectTimeout() { - return selectTimeout; - } - - public void setSelectTimeout(long selectTimeout) { - Assert.greaterThanZero(selectTimeout, "selectTimeout"); - this.selectTimeout = selectTimeout; - } - - public long getConnectionTimeout() { - return connectionTimeout; - } - - public void setConnectionTimeout(long connectionTimeout) { - this.connectionTimeout = connectionTimeout; - } - - public SessionMapper getSessionMapper() { - return sessionMapper; - } - - public void setSessionMapper(SessionMapper sessionMapper) { - this.sessionMapper = sessionMapper; - } - - public void setWorking(boolean working) { - this.working = working; - if (this.working && !working) { - try { - selector.close(); - } catch (IOException ex) { - LOGGER.warn(ex.getMessage(), ex); - } - } - } - - @Override - public boolean isCollectMetrics() { - return collectMetrics; - } - - @Override - public void setCollectMetrics(boolean collectMetrics) { - if (collectMetrics && metric == null) { - metric = new EmitterMetric(); - try { - metric.activate(); - } catch (Exception e) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Error occured during metric activation. " + e.getMessage(), e); - } - } - this.collectMetrics = true; - } else if (!collectMetrics && metric != null) { - this.collectMetrics = false; - try { - metric.deactivate(); - } catch (Exception e) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Error occured during metric deactivation. " + e.getMessage(), e); - } - } - - metric = null; - } - } - - public boolean isWorking() { - return working; - } - - @Override - public void writeMetrics(MetricsContainer container) { - if (collectMetrics) { - container.update(System.currentTimeMillis(), metric); - metric.reset(); - } - } - - public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException { - tasks.add(new ConnectTask(sessionInfo, handler)); - selector.wakeup(); - } - - void flush(SelectionKey key) { - tasks.add(new FlushTask(key)); - selector.wakeup(); - } - - private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) { - try { - ConnectionParams connParams = sessionMapper.map(sessionInfo); - if (connParams == null) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Unable to map session '" + sessionInfo + "'."); - } - - if (collectMetrics) { - synchronized (metric) { - metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID); - } - } - - try { - handler.sessionInvalidated(sessionInfo); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } - - return; - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Registering session '" + sessionInfo + "'. Mapped connection parameters '" + connParams + "'."); - } - - SocketChannel channel = SocketChannel.open(); - channel.configureBlocking(false); - - SocketAddress bindAddress = connParams.getBindAddress(); - if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) { - channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress)); - } - - SocketAddress remoteAddress = connParams.getRemoteAddress(); - if (remoteAddress == null) { - remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); - } - - NioChannelContext channelContext = new NioChannelContext(this, channel, remoteAddress, sessionInfo); - KeyContext keyContext = new KeyContext(channelContext, handler); - SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext); - try { - handler.channelRegistered(channelContext); - } catch (Exception ex) { - doCatchException(key, ex); - } - - channelContext.selectionKey(key); - try { - channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress)); - } catch (Exception ex) { - doCatchException(key, ex); - return; - } - - selector.wakeup(); - } catch (Exception e) { - if (collectMetrics) { - metric.errorCaught(e); - } - LOGGER.error(e.getMessage(), e); - } - - } - - private void doFinishConnect(SelectionKey key) { - SocketChannel channel = (SocketChannel) key.channel(); - KeyContext keyContext = (KeyContext) key.attachment(); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Connecting to '" + keyContext.channelContext.getRemoteAddress() + "'."); - } - - try { - long connStart = System.currentTimeMillis(); - boolean timeouted = false; - while (!channel.finishConnect()) { - long now = System.currentTimeMillis(); - if (now - connStart < connectionTimeout) { - timeouted = true; - break; - } - } - key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); - - if (timeouted) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress()); - } - - throw new ConnectException("Connection timed out."); - - } - } catch (Exception e) { - doCatchException(key, e); - key.cancel(); - if (collectMetrics) { - metric.incConnectionsErrors(); - } - - return; - } - - try { - if (collectMetrics) { - metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress()); - metric.addBindSocket(keyContext.channelContext.getLocalAddress()); - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress()); - } - - keyContext.handler.channelActive(keyContext.channelContext); - setOpRead(key); - } catch (Exception ex) { - LOGGER.error(ex.getMessage(), ex); - } - } - - private void doWrite(SelectionKey key) { - SocketChannel socketChannel = (SocketChannel) key.channel(); - KeyContext keyContext = (KeyContext) key.attachment(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Writing ({} -> {}).", - keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); - } - - keyContext.handler.dataWriteStart(keyContext.channelContext); - Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue(); - int written = 0; - try { - ByteBuffer buffer; - while (!queue.isEmpty()) { - buffer = queue.poll(); - while (buffer.hasRemaining()) { - int res = socketChannel.write(buffer); - - if (res == -1) { - doClose(key); - return; - } - - if (collectMetrics) { - metric.updateSentBytes(res); - } - - written += res; - } - } - } catch (Exception e) { - doCatchException(key, e); - doClose(key); - return; - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Written {}B ({} -> {})", written, - keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); - } - - //TODO Operacje na handlerach powinny przechodzic przez Executor - try { - keyContext.handler.dataWritten(keyContext.channelContext); - LOGGER.debug("Write handled."); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } - - setOpRead(key); - clearOpWrite(key); - } - - private void doRead(SelectionKey key) { - SocketChannel channel = (SocketChannel) key.channel(); - KeyContext keyContext = (KeyContext) key.attachment(); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Reading ({} -> {})", - keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); - } - - ByteBuffer buffer = keyContext.buffer; - - buffer.clear(); - - ByteBuff buff = new HeapByteBuff(); - int totalReaded = 0; - int readed; - try { - while ((readed = channel.read(buffer)) > 0) { - buffer.flip(); - buff.append(buffer.array(), buffer.position(), buffer.limit()); - buffer.clear(); - totalReaded += readed; - - if (collectMetrics) { - metric.updateReceivedBytes(readed); - } - } - } catch (IOException e) { - doCatchException(key, e); - doClose(key); - return; - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Readed {}B ({} -> {})", totalReaded, - keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); - } - - if (totalReaded > 0) { - try { - keyContext.handler.dataReceived(keyContext.channelContext, buff); - LOGGER.debug("Read handled."); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } - } - - if (readed == -1) { - doClose(key); - return; - } - - keyContext.buffer.flip(); - } - - private void doCatchException(SelectionKey key, Throwable cause) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Error occured. " + cause.getMessage(), cause); - } - - if (collectMetrics) { - synchronized (metric) { - metric.errorCaught(cause); - } - } - - KeyContext keyContext = (KeyContext) key.attachment(); - - try { - keyContext.handler.errorOccured(keyContext.channelContext, cause); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } - } - - private void doClose(SelectionKey key) { - if (!key.channel().isOpen()) { - selector.wakeup(); - return; - } - - try { - key.channel().close(); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } - - KeyContext keyContext = (KeyContext) key.attachment(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'."); - } - - try { - keyContext.handler.channelInactive(keyContext.channelContext); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } - - key.cancel(); - try { - keyContext.handler.channelUnregistered(keyContext.channelContext); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'."); - } - - if (collectMetrics) { - metric.incClosedConnections(); - } - - selector.wakeup(); - } - - static void setOpRead(SelectionKey key) { - if (!key.isValid() || key.isReadable()) { - return; - } - - key.interestOps(key.interestOps() | SelectionKey.OP_READ); - key.selector().wakeup(); - } - - static void clearOpRead(SelectionKey key) { - if (!key.isValid() || !key.isReadable()) { - return; - } - - key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); - key.selector().wakeup(); - } - - static void setOpWrite(SelectionKey key) { - if (!key.isValid() || key.isWritable()) { - return; - } - - key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); - key.selector().wakeup(); - } - - static void clearOpWrite(SelectionKey key) { - if (!key.isValid() || !key.isWritable()) { - return; - } - - key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); - key.selector().wakeup(); - } - - void requestClose(SelectionKey key) { - tasks.add(new CloseTask(key)); - key.selector().wakeup(); - } - - @Override - public void run() { - int selected = 0; - working = true; - while (working) { - if (!tasks.isEmpty()) { - Task task; - while ((task = tasks.poll()) != null) { - if (task.code == Task.CLOSE) { - doClose(((CloseTask) task).key); - } else if (task.code == Task.CONNECT) { - ConnectTask taskConn = (ConnectTask) task; - doConnect(taskConn.sessionInfo, taskConn.handler); - } else if (task.code == Task.FLUSH) { - FlushTask flushTask = (FlushTask) task; - setOpWrite(flushTask.key); - } - } - } - - try { - selected = selector.select(selectTimeout); - } catch (IOException ex) { - LOGGER.warn(ex.getMessage(), ex); - } - - if (selected > 0) { - Iterator<SelectionKey> it = selector.selectedKeys().iterator(); - while (it.hasNext()) { - SelectionKey key = it.next(); - it.remove(); - - if (!key.isValid()) { - continue; - } - - if (key.isConnectable()) { - doFinishConnect(key); - } else if (key.isWritable()) { - doWrite(key); - } else if (key.isReadable()) { - doRead(key); - } - } - } - } - } - - private static class KeyContext { - - private final EmitterHandler handler; - - private final NioChannelContext channelContext; - - private ByteBuffer buffer = ByteBuffer.allocate(1024); - - public KeyContext(NioChannelContext channelContext, EmitterHandler handler) { - this.channelContext = channelContext; - this.handler = handler; - } - - } - - private static abstract class Task { - - public static final int CLOSE = 1; - public static final int CONNECT = 2; - public static final int FLUSH = 3; - - private final int code; - - public Task(int code) { - this.code = code; - } - - } - - private final static class ConnectTask extends Task { - - private final SessionInfo sessionInfo; - private final EmitterHandler handler; - - public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) { - super(CONNECT); - this.sessionInfo = sessionInfo; - this.handler = handler; - } - - } - - private final static class CloseTask extends Task { - - private final SelectionKey key; - - public CloseTask(SelectionKey key) { - super(CLOSE); - this.key = key; - } - - } - - private final static class FlushTask extends Task { - - private final SelectionKey key; - - public FlushTask(SelectionKey key) { - super(FLUSH); - this.key = key; - } - - } -} +package com.passus.st.emitter.nio; + +import com.passus.commons.Assert; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.EmitterMetric; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.emitter.SessionMapper; +import com.passus.st.metric.MetricSource; +import java.io.IOException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * + * @author Mirosław Hawrot + */ +public abstract class NioEmitterWorker extends Thread implements MetricSource { + + protected final Logger logger = LogManager.getLogger(getClass()); + + private final int index; + + protected long selectTimeout = 100; + + protected SessionMapper sessionMapper; + + protected volatile boolean collectMetrics = false; + + protected long connectionTimeout = 5_000; + + protected EmitterMetric metric; + + protected NioEmitterWorker(int index) throws IOException { + super("NioEmitterWorker-" + index); + this.index = index; + } + + public int getIndex() { + return index; + } + + public abstract void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException; + + public abstract void setWorking(boolean b); + + public long getSelectTimeout() { + return selectTimeout; + } + + public void setSelectTimeout(long selectTimeout) { + Assert.greaterThanZero(selectTimeout, "selectTimeout"); + this.selectTimeout = selectTimeout; + } + + public long getConnectionTimeout() { + return connectionTimeout; + } + + public void setConnectionTimeout(long connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + public SessionMapper getSessionMapper() { + return sessionMapper; + } + + public void setSessionMapper(SessionMapper sessionMapper) { + this.sessionMapper = sessionMapper; + } + + @Override + public boolean isCollectMetrics() { + return collectMetrics; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + if (collectMetrics && metric == null) { + metric = new EmitterMetric(); + try { + metric.activate(); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug("Error occured during metric activation. " + e.getMessage(), e); + } + } + this.collectMetrics = true; + } else if (!collectMetrics && metric != null) { + this.collectMetrics = false; + try { + metric.deactivate(); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug("Error occured during metric deactivation. " + e.getMessage(), e); + } + } + + metric = null; + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker2.java Tue Oct 24 15:26:49 2017 +0200 @@ -0,0 +1,787 @@ +package com.passus.st.emitter.nio; + +import com.passus.data.ByteBuff; +import com.passus.data.HeapByteBuff; +import com.passus.net.SocketAddress; +import com.passus.net.utils.AddressUtils; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.EmitterMetric; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.metric.MetricsContainer; +import java.io.IOException; +import java.net.ConnectException; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; + +/** + * + * @author Mirosław Hawrot + */ +public class NioEmitterWorker2 extends NioEmitterWorker { + + private final Selector selector; + + private volatile boolean working = false; + + private final Queue<Task> tasks = new ConcurrentLinkedQueue<>(); + + private boolean reusePort = true; + + private boolean tcpNoDelay = true; + + private final AtomicBoolean wakeUp = new AtomicBoolean(false); + + private final Object selectorLock1 = new Object(); + + private final Object selectorLock2 = new Object(); + + private long wakeUpTime; + + public NioEmitterWorker2(int index) throws IOException { + super(index); + selector = Selector.open(); + } + + public boolean isReusePort() { + return reusePort; + } + + public void setReusePort(boolean reusePort) { + this.reusePort = reusePort; + } + + public boolean isTcpNoDelay() { + return tcpNoDelay; + } + + public void setTcpNoDelay(boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + } + + @Override + public boolean isCollectMetrics() { + return collectMetrics; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + if (collectMetrics && metric == null) { + metric = new EmitterMetric(); + try { + metric.activate(); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug("Error occured during metric activation. " + e.getMessage(), e); + } + } + this.collectMetrics = true; + } else if (!collectMetrics && metric != null) { + this.collectMetrics = false; + try { + metric.deactivate(); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug("Error occured during metric deactivation. " + e.getMessage(), e); + } + } + + metric = null; + } + } + + public boolean isWorking() { + return working; + } + + @Override + public void setWorking(boolean working) { + if (this.working && !working) { + this.working = false; + tasks.clear(); + wakeUp(); + + try { + selector.close(); + } catch (IOException ex) { + logger.warn(ex.getMessage(), ex); + } + + interrupt(); + + try { + join(); + } catch (Exception e) { + } + + } + } + + @Override + public void writeMetrics(MetricsContainer container) { + if (collectMetrics) { + container.update(System.currentTimeMillis(), metric); + metric.reset(); + } + } + + @Override + public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException { + doRegister(sessionInfo, handler); + } + + void flush(SelectionKey key) { + tasks.add(new FlushTask(key)); + selector.wakeup(); + } + + private void doRegister(SessionInfo sessionInfo, EmitterHandler handler) { + long t0 = System.currentTimeMillis(); + try { + /*ConnectionParams connParams = sessionMapper.map(sessionInfo); + if (connParams == null) { + if (logger.isDebugEnabled()) { + logger.debug("Unable to map session '" + sessionInfo + "'."); + } + + try { + handler.sessionInvalidated(sessionInfo); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + return; + }*/ + + if (logger.isDebugEnabled()) { + //logger.debug("Registering session '" + sessionInfo + "'. Mapped connection parameters '" + connParams + "'."); + logger.debug("Registering session '" + sessionInfo + "'."); + } + + SocketChannel channel = SocketChannel.open(); + channel.setOption(StandardSocketOptions.SO_REUSEADDR, reusePort); + channel.setOption(StandardSocketOptions.SO_LINGER, 0); + channel.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay); + + channel.configureBlocking(false); + + //SocketAddress bindAddress = connParams.getBindAddress(); + /*if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) { + channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress)); + }*/ + + /*SocketAddress remoteAddress = connParams.getRemoteAddress(); + if (remoteAddress == null) { + remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); + }*/ + SocketAddress remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); + + NioChannelContext2 channelContext = new NioChannelContext2(this, channel, remoteAddress, sessionInfo); + KeyContext keyContext = new KeyContext(channelContext, handler); + + SelectionKey key; + synchronized (selectorLock2) { + wakeUp(); + + synchronized (selectorLock1) { + key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext); + channelContext.regTime = System.currentTimeMillis(); + } + + channelContext.selectionKey(key); + + try { + handler.channelRegistered(channelContext); + } catch (Exception ex) { + doCatchException(key, ex); + } + + try { + channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress)); + } catch (Exception ex) { + doCatchException(key, ex); + doClose(key); + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Session registered '{}'.", sessionInfo); + } + } + + wakeUp(); + } catch (Exception e) { + if (collectMetrics) { + metric.errorCaught(e); + } + logger.error(e.getMessage(), e); + } + + /*long t1 = System.currentTimeMillis(); + if (t1 - t0 > 2) { + System.out.println("Register: " + (t1 - t0)); + }*/ + } + + private void doFinishConnect(SelectionKey key) { + SocketChannel channel = (SocketChannel) key.channel(); + KeyContext keyContext = (KeyContext) key.attachment(); + + long t0 = System.currentTimeMillis(); + if (logger.isDebugEnabled()) { + logger.debug("Connecting to '{}'.", keyContext.channelContext.getRemoteAddress()); + } + + try { + long connStart = System.currentTimeMillis(); + boolean timeouted = false; + while (!channel.finishConnect()) { + long now = System.currentTimeMillis(); + if (now - connStart < connectionTimeout) { + timeouted = true; + break; + } + } + + long t1 = System.currentTimeMillis(); + if (t1 - keyContext.channelContext.regTime > 10) { + System.out.println("--Connect delay: " + (t1 - keyContext.channelContext.regTime)); + } + + clearOpConnect(key); + setOpRead(key); + if (timeouted) { + if (logger.isDebugEnabled()) { + logger.debug("Connection '{}' timed out.", keyContext.channelContext.getRemoteAddress()); + } + + throw new ConnectException("Connection timed out."); + + } + } catch (Exception e) { + doCatchException(key, e); + doClose(key); + if (collectMetrics) { + metric.incConnectionsErrors(); + } + + return; + } + + try { + if (collectMetrics) { + metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress()); + metric.addBindSocket(keyContext.channelContext.getLocalAddress()); + } + + if (logger.isDebugEnabled()) { + logger.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress()); + } + + keyContext.handler.channelActive(keyContext.channelContext); + } catch (Exception ex) { + logger.error(ex.getMessage(), ex); + } + + /*long t1 = System.currentTimeMillis(); + if (t1 - t0 > 2) { + System.out.println("Finish connect: " + (t1 - t0)); + }*/ + } + + private void doWrite(SelectionKey key) { + SocketChannel socketChannel = (SocketChannel) key.channel(); + KeyContext keyContext = (KeyContext) key.attachment(); + if (logger.isDebugEnabled()) { + logger.debug("Writing to '{}'.", keyContext.channelContext.getRemoteAddress()); + } + + Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue(); + int written = 0; + try { + ByteBuffer buffer; + while (!queue.isEmpty()) { + buffer = queue.poll(); + while (buffer.hasRemaining()) { + int res = socketChannel.write(buffer); + + if (res == -1) { + doClose(key); + return; + } + + written += res; + } + } + } catch (Exception e) { + doCatchException(key, e); + doClose(key); + return; + } + + if (collectMetrics) { + metric.updateSentBytes(written); + } + + if (logger.isDebugEnabled()) { + logger.debug("Written '" + written + "' to '" + keyContext.channelContext.getRemoteAddress() + "'."); + } + + //TODO Operacje na handlerach powinny przechodzic przez Executor + try { + keyContext.handler.dataWritten(keyContext.channelContext); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + setOpRead(key); + clearOpWrite(key); + } + + private int read(SocketChannel channel, ByteBuffer buffer, ByteBuff out) throws IOException { + int readed; + int totalReaded = 0; + while ((readed = channel.read(buffer)) > 0) { + buffer.flip(); + out.append(buffer.array(), buffer.position(), buffer.limit()); + buffer.clear(); + if (readed < 0) { + return -1; + } + + totalReaded += readed; + } + + return totalReaded; + } + + private void doRead(SelectionKey key) { + SocketChannel channel = (SocketChannel) key.channel(); + KeyContext keyContext = (KeyContext) key.attachment(); + + if (logger.isDebugEnabled()) { + logger.debug("Reading from '" + keyContext.channelContext.getRemoteAddress() + "'."); + } + + ByteBuffer buffer = keyContext.buffer; + + keyContext.buffer.clear(); + + ByteBuff buff = new HeapByteBuff(); + int totalReaded = 0; + int readed; + try { + while ((readed = channel.read(buffer)) > 0) { + buffer.flip(); + buff.append(buffer.array(), buffer.position(), buffer.limit()); + buffer.clear(); + totalReaded += readed; + } + } catch (IOException e) { + doCatchException(key, e); + doClose(key); + return; + } + + if (collectMetrics) { + metric.updateReceivedBytes(totalReaded); + } + + if (logger.isDebugEnabled()) { + logger.debug(totalReaded + " readed from '" + keyContext.channelContext.getRemoteAddress() + "'."); + } + + try { + keyContext.handler.dataReceived(keyContext.channelContext, buff); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + if (readed == -1) { + doClose(key); + return; + } + + keyContext.buffer.flip(); + } + + private void doCatchException(SelectionKey key, Throwable cause) { + if (logger.isDebugEnabled()) { + logger.debug("Error occured. " + cause.getMessage(), cause); + } + + if (collectMetrics) { + synchronized (metric) { + metric.errorCaught(cause); + } + } + + KeyContext keyContext = (KeyContext) key.attachment(); + + try { + keyContext.handler.errorOccured(keyContext.channelContext, cause); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + } + + void doClose(SelectionKey key) { + if (!key.isValid()) { + return; + } + + KeyContext keyContext = (KeyContext) key.attachment(); + if (key.channel().isOpen()) { + try { + key.channel().close(); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + } + + if (logger.isDebugEnabled()) { + logger.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'."); + } + try { + keyContext.handler.channelInactive(keyContext.channelContext); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + if (logger.isDebugEnabled()) { + logger.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'."); + } + + if (collectMetrics) { + metric.incClosedConnections(); + } + } + + private void sslDoHandshake(SocketChannel socketChannel, SSLEngine engine, + ByteBuffer myNetData, ByteBuffer peerNetData) throws Exception { + + // Create byte buffers to use for holding application data + int appBufferSize = engine.getSession().getApplicationBufferSize(); + ByteBuffer myAppData = ByteBuffer.allocate(appBufferSize); + ByteBuffer peerAppData = ByteBuffer.allocate(appBufferSize); + + // Begin handshake + engine.beginHandshake(); + SSLEngineResult.HandshakeStatus hs = engine.getHandshakeStatus(); + + // Process handshaking message + while (hs != SSLEngineResult.HandshakeStatus.FINISHED + && hs != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) { + switch (hs) { + case NEED_UNWRAP: + // Receive handshaking data from peer + if (socketChannel.read(peerNetData) < 0) { + // Handle closed channel + } + + // Process incoming handshaking data + peerNetData.flip(); + SSLEngineResult res = engine.unwrap(peerNetData, peerAppData); + peerNetData.compact(); + hs = res.getHandshakeStatus(); + + switch (res.getStatus()) { + case OK: + // Handle OK status + break; + case BUFFER_UNDERFLOW: + break; + case BUFFER_OVERFLOW: + break; + case CLOSED: + break; + // Handle other status: BUFFER_UNDERFLOW, BUFFER_OVERFLOW, CLOSED + } + break; + + case NEED_WRAP: + // Empty the local network packet buffer. + myNetData.clear(); + + // Generate handshaking data + res = engine.wrap(myAppData, myNetData); + hs = res.getHandshakeStatus(); + + // Check status + switch (res.getStatus()) { + case OK: + myNetData.flip(); + + // Send the handshaking data to peer + while (myNetData.hasRemaining()) { + if (socketChannel.write(myNetData) < 0) { + // Handle closed channel + } + } + break; + + // Handle other status: BUFFER_OVERFLOW, BUFFER_UNDERFLOW, CLOSED + } + break; + + case NEED_TASK: + // Handle blocking tasks + break; + + // Handle other status: // FINISHED or NOT_HANDSHAKING + } + } + // Processes after handshaking + } + + private void sslDoWrap(SocketChannel socketChannel) { + + } + + private void sslDoUnwrap(SocketChannel socketChannel, SSLEngine engine, ByteBuffer netData, ByteBuffer appData) { + SSLEngineResult res; + try { + res = engine.unwrap(netData, appData); + } catch (Exception e) { + return; + } + + switch (res.getStatus()) { + case OK: + break; + case CLOSED: + break; + case BUFFER_OVERFLOW: + // Maybe need to enlarge the peer application data buffer. + if (engine.getSession().getApplicationBufferSize() + > appData.capacity()) { + // enlarge the peer application data buffer + } else { + // compact or clear the buffer + } + // retry the operation + break; + + case BUFFER_UNDERFLOW: + // Maybe need to enlarge the peer network packet buffer + if (engine.getSession().getPacketBufferSize() + > netData.capacity()) { + // enlarge the peer network packet buffer + } else { + // compact or clear the buffer + } + // obtain more inbound network data and then retry the operation + break; + } + } + + private void wakeUp() { + if (wakeUp.compareAndSet(false, true)) { + wakeUpTime = System.currentTimeMillis(); + selector.wakeup(); + } + } + + private void clearOpConnect(SelectionKey key) { + if (!key.isValid() || !key.isConnectable()) { + return; + } + + key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); + } + + private void setOpRead(SelectionKey key) { + if (!key.isValid() || key.isReadable()) { + return; + } + + key.interestOps(key.interestOps() | SelectionKey.OP_READ); + wakeUp(); + } + + private void clearOpRead(SelectionKey key) { + if (!key.isValid() || !key.isReadable()) { + return; + } + + key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); + } + + private void setOpWrite(SelectionKey key) { + if (!key.isValid() || key.isWritable()) { + return; + } + + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + wakeUp(); + } + + private void clearOpWrite(SelectionKey key) { + if (!key.isValid() || !key.isWritable()) { + return; + } + + key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); + } + + void requestClose(SelectionKey key) { + tasks.add(new CloseTask(key)); + wakeUp(); + } + + @Override + public void run() { + int selected = 0; + working = true; + while (working) { + long t0 = System.currentTimeMillis(); + if (!tasks.isEmpty()) { + Task task; + while ((task = tasks.poll()) != null) { + if (task.code == Task.CLOSE) { + doClose(((CloseTask) task).key); + /*} else if (task.code == Task.CONNECT) { + ConnectTask taskConn = (ConnectTask) task; + doRegister(taskConn.sessionInfo, taskConn.handler);*/ + } else if (task.code == Task.FLUSH) { + FlushTask flushTask = (FlushTask) task; + setOpWrite(flushTask.key); + } + } + } + /*long t1 = System.currentTimeMillis(); + if (t1 - t0 > 2) { + System.out.println("tasks loop: " + (t1 - t0)); + }*/ + + synchronized (selectorLock1) { + try { + //long t0 = System.currentTimeMillis(); + selected = selector.select(selectTimeout); + wakeUp.set(false); + /*if (this.wakeUpTime != 0) { + long wakeUpTime = System.currentTimeMillis(); + + if (wakeUpTime - this.wakeUpTime > 2) { + System.out.println("wakeUpTime: " + (wakeUpTime - this.wakeUpTime)); + } + this.wakeUpTime = 0; + }*/ + + //long t1 = System.currentTimeMillis(); + /*if (t1 - t0 > 2) { + System.out.println("select: " + (t1 - t0) + " ms"); + }*/ + } catch (IOException ex) { + logger.warn(ex.getMessage(), ex); + } + } + + t0 = System.currentTimeMillis(); + synchronized (selectorLock2) { + if (selected > 0) { + Iterator<SelectionKey> it = selector.selectedKeys().iterator(); + while (it.hasNext()) { + SelectionKey key = it.next(); + it.remove(); + + if (!key.isValid()) { + try { + KeyContext keyContext = (KeyContext) key.attachment(); + keyContext.handler.channelUnregistered(keyContext.channelContext); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + continue; + } + + if (key.isConnectable()) { + doFinishConnect(key); + } else if (key.isWritable()) { + doWrite(key); + } else if (key.isReadable()) { + doRead(key); + } + } + } + } + + /*t1 = System.currentTimeMillis(); + if (t1 - t0 > 2) { + System.out.println("selected loop: " + (t1 - t0)); + }*/ + } + } + + private static class KeyContext { + + private final EmitterHandler handler; + + private final NioChannelContext2 channelContext; + + private ByteBuffer buffer = ByteBuffer.allocate(1024); + + public KeyContext(NioChannelContext2 channelContext, EmitterHandler handler) { + this.channelContext = channelContext; + this.handler = handler; + } + + } + + private static abstract class Task { + + public static final int CLOSE = 1; + public static final int CONNECT = 2; + public static final int FLUSH = 3; + + private final int code; + + public Task(int code) { + this.code = code; + } + + } + + private final static class ConnectTask extends Task { + + private final SessionInfo sessionInfo; + private final EmitterHandler handler; + + public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) { + super(CONNECT); + this.sessionInfo = sessionInfo; + this.handler = handler; + } + + } + + private final static class CloseTask extends Task { + + private final SelectionKey key; + + public CloseTask(SelectionKey key) { + super(CLOSE); + this.key = key; + } + + } + + private final static class FlushTask extends Task { + + private final SelectionKey key; + + public FlushTask(SelectionKey key) { + super(FLUSH); + this.key = key; + } + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/emitter/nio/NioEmitterWorker2Test.java Tue Oct 24 15:26:49 2017 +0200 @@ -0,0 +1,156 @@ +package com.passus.st.emitter.nio; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import com.passus.data.ByteBuff; +import com.passus.data.HeapByteBuff; +import com.passus.net.http.HttpConsts; +import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpRequestBuilder; +import com.passus.net.http.HttpRequestEncoder; +import com.passus.st.AbstractWireMockTest; +import com.passus.st.client.TestClientHandler; +import com.passus.st.client.TestClientHandler.ClientEvent; +import com.passus.st.client.TestClientHandler.EventType; +import com.passus.st.emitter.ChannelContext; +import com.passus.st.emitter.SessionInfo; +import java.net.ConnectException; +import org.testng.AssertJUnit; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; +import org.testng.annotations.Test; + +/** + * + * @author Mirosław Hawrot + */ +public class NioEmitterWorker2Test extends AbstractWireMockTest { + + private void waitConn(TestClientHandler handler, int expectedEventsSize) { + waitConn(handler, expectedEventsSize, 5_000); + } + + private void waitConn(TestClientHandler handler, int expectedEventsSize, long timeout) { + long endTime = System.currentTimeMillis() + timeout; + if (endTime <= 0) { + endTime = Long.MAX_VALUE; + } + + while (true) { + if (handler.size() >= expectedEventsSize + || System.currentTimeMillis() >= endTime + || handler.isChannelUnregistered()) { + break; + } + + try { + Thread.sleep(100); + } catch (Exception ignore) { + } + } + } + + @Test(enabled = true) + public void testConnectAndClose() throws Exception { + NioEmitter emitter = new NioEmitter(NioEmitterWorker2.class); + emitter.setMaxThreads(1); + emitter.start(); + SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, PORT); + + TestClientHandler handler = new TestClientHandler() { + @Override + protected void doChannelActive(ChannelContext context) throws Exception { + context.close(); + } + }; + + emitter.connect(info, handler, 0); + waitConn(handler, 3); + + AssertJUnit.assertEquals(3, handler.size()); + AssertJUnit.assertEquals(EventType.CHANNEL_REGISTERED, handler.get(0).getType()); + AssertJUnit.assertEquals(EventType.CHANNEL_ACTIVE, handler.get(1).getType()); + AssertJUnit.assertEquals(EventType.CHANNEL_INACTIVE, handler.get(2).getType()); + } + + @Test(enabled = true) + public void testWriteAndRead() throws Exception { + String path = "/some/thing"; + String url = "http://localhost" + path; + HttpRequest req = HttpRequestBuilder.get(url).version(HttpConsts.VERSION_1_0).build(); + String content = "Hello world!"; + stubFor(get(urlEqualTo("/some/thing")) + .willReturn(aResponse() + .withHeader("Content-Type", "text/plain") + .withHeader("Content-Length", "" + content.length()) + .withBody(content))); + + NioEmitter emitter = new NioEmitter(NioEmitterWorker2.class); + emitter.setMaxThreads(1); + emitter.start(); + SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, PORT); + + TestClientHandler handler = new TestClientHandler() { + + @Override + protected void doChannelActive(ChannelContext context) throws Exception { + ByteBuff buff = new HeapByteBuff(); + HttpRequestEncoder.getInstance().encode(req, buff); + context.writeAndFlush(buff); + } + + @Override + protected void doDataReceived(ChannelContext context, ByteBuff data) throws Exception { + if (data.toString().contains("!")) { + context.close(); + } + } + + }; + + emitter.connect(info, handler, 0); + waitConn(handler, 20, 5_000); + + assertTrue(handler.size() >= 6); + int index = 0; + assertEquals(EventType.CHANNEL_REGISTERED, handler.get(index++).getType()); + assertEquals(EventType.CHANNEL_ACTIVE, handler.get(index++).getType()); + assertEquals(EventType.DATA_WRITTEN, handler.get(index++).getType()); + + ByteBuff resContentBuff = new HeapByteBuff(); + + do { + ClientEvent event = handler.get(index++); + assertEquals(EventType.DATA_RECEIVED, event.getType()); + resContentBuff.append(event.getData()); + } while (handler.get(index).getType() == EventType.DATA_RECEIVED); + + assertEquals(EventType.CHANNEL_INACTIVE, handler.get(index++).getType()); + /*assertEquals(EventType.CHANNEL_UNREGISTERED, handler.get(index++).getType());*/ + String responseContent = resContentBuff.toString(); + assertTrue(responseContent.endsWith(content)); + } + + @Test(enabled = true) + public void testError_ConnectionRefused() throws Exception { + NioEmitter emitter = new NioEmitter(NioEmitterWorker2.class); + emitter.setMaxThreads(1); + emitter.start(); + SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, 10_000); + + TestClientHandler handler = new TestClientHandler() { + @Override + protected void doChannelActive(ChannelContext context) throws Exception { + context.close(); + } + }; + + emitter.connect(info, handler, 0); + waitConn(handler, 2); + + AssertJUnit.assertEquals(EventType.CHANNEL_REGISTERED, handler.get(0).getType()); + ClientEvent clientEvent = handler.get(1); + AssertJUnit.assertEquals(EventType.ERROR_OCCURED, clientEvent.getType()); + AssertJUnit.assertTrue(clientEvent.getCause() instanceof ConnectException); + AssertJUnit.assertTrue(clientEvent.getCause().getMessage().startsWith("Connection refused")); + } +}