Mercurial > stress-tester
changeset 620:811aa52e7ebe http-asynch-worker
in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/Main.java Mon Oct 09 10:02:28 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/Main.java Fri Oct 13 07:34:51 2017 +0200 @@ -242,11 +242,13 @@ client.setCollectMetrics(true); client.setConnectPartialSession(cl.hasOption("ps")); client.setWokerType(cl.getOptionValue("wt", "synch")); - client.addListener((request, response, context) -> { + /*client.addListener((request, response, context) -> { if (startTime == 0) { startTime = System.currentTimeMillis(); } - }); + });*/ + + startTime = System.currentTimeMillis(); if (cl.hasOption("pr")) { if (clArgs.length != 1) { throw new IllegalArgumentException("Parameter \"parallelReplays\" works only for one pcap file."); @@ -263,6 +265,7 @@ client.setWorkersNum(clArgs.length); client.setDispatcher(new HttpSourceNameAwareClientWorkerDispatcher()); } + emitter.setMaxThreads(1); emitter.start(); if (cl.hasOption("rs")) { @@ -438,26 +441,28 @@ collector.collect(); - executor.shutdownNow(); + for (int i = 0; i < clArgs.length; i++) { eventSrcs[i].stop(); } client.stop(); emitter.stop(); + + collector.flush(true); + executor.shutdownNow(); + printMetrics(); + if (reporterClient != null) { reporterClient.send(new MapMetric("endOfReplay", Collections.emptyMap())); reporterClient.waitForEmptyQueue(); reporterClient.stop(); System.out.println("Dropped reporter messages: " + reporterClient.getDroppedMessages()); } + if (summaryListener != null) { summaryListener.close(); } - - collector.flush(true); - - printMetrics(); } catch (ParseException e) { System.out.println(e.getMessage()); printHelp(options);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/Test.java Fri Oct 13 07:34:51 2017 +0200 @@ -0,0 +1,630 @@ +package com.passus.st; + +import com.passus.net.SocketAddress; +import com.passus.st.emitter.ChannelContext; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.PassThroughSessionMapper; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.emitter.nio.NioEmitterWorkerNoExecutors; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.StandardSocketOptions; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * + * @author Mirosław Hawrot + */ +public class Test { + + private int threadsNum = 4; + + private int loops = 10; + + private int connsPerLoop = 40; + + private boolean reuseAddress = true; + + private String host = "127.0.0.1"; + + private int port = 80; + + public static void main(String[] args) throws Exception { + //Log4jConfigurationFactory.enableFactory("debug"); + Test test = new Test(); + String testName = "all"; + if (args.length == 1) { + testName = args[0]; + } + + test.run(testName); + } + + public void run(String testName) throws Exception { + System.out.println("threads: " + threadsNum + " loops: " + loops + " connsPerLoop: " + connsPerLoop + " reuseAddress: " + reuseAddress); + if (testName.equals("all") || testName.equalsIgnoreCase("nio")) { + runNioTest(); + } + + if (testName.equals("all") || testName.equalsIgnoreCase("nio2")) { + runNio2Test(); + } + + if (testName.equals("all") || testName.equalsIgnoreCase("socket")) { + runSocketTest(); + } + + if (testName.equals("all") || testName.equalsIgnoreCase("threadPerSocket")) { + runThreadPerSocketTest(); + } + + if (testName.equals("all") || testName.equalsIgnoreCase("nioEmitter")) { + runNio2Test(); + runNioEmitterWorker(); + } + + //runNioEmitterWorker(); + } + + public void runSocketTest() throws Exception { + System.out.println("Socket"); + + long startTime = System.currentTimeMillis(); + + SocketBunchThread[] threads = new SocketBunchThread[threadsNum]; + for (int i = 0; i < threadsNum; i++) { + threads[i] = new SocketBunchThread(); + threads[i].loops = loops; + threads[i].connNum = connsPerLoop; + threads[i].reuseAddress = reuseAddress; + threads[i].remote = new InetSocketAddress(host, port); + threads[i].start(); + } + + int totalConn = 0; + for (int i = 0; i < threadsNum; i++) { + threads[i].join(); + totalConn += threads[i].totalConn; + } + + System.out.println("time: " + (System.currentTimeMillis() - startTime) + " conns: " + totalConn); + for (int i = 0; i < threadsNum; i++) { + System.out.println("time thread" + i + ": " + threads[i].duration); + } + } + + public void runThreadPerSocketTest() throws Exception { + System.out.println("ThreadPerSocket"); + + long startTime = System.currentTimeMillis(); + + SocketThread[] threads = new SocketThread[connsPerLoop]; + InetSocketAddress remote = new InetSocketAddress(host, port); + int totalConn = 0; + + for (int i = 0; i < loops * threadsNum; i++) { + for (int j = 0; j < connsPerLoop; j++) { + threads[j] = new SocketThread(remote); + threads[j].start(); + totalConn++; + } + + for (int j = 0; j < connsPerLoop; j++) { + threads[j].join(); + } + + for (int j = 0; j < connsPerLoop; j++) { + threads[j].socket.close(); + } + } + + System.out.println("time: " + (System.currentTimeMillis() - startTime) + " conns: " + totalConn); + } + + public void runNioTest() throws Exception { + System.out.println("Nio"); + + long startTime = System.currentTimeMillis(); + + NioThread[] threads = new NioThread[threadsNum]; + for (int i = 0; i < threadsNum; i++) { + threads[i] = new NioThread(); + threads[i].loops = loops; + threads[i].connNum = connsPerLoop; + threads[i].reuseAddress = reuseAddress; + threads[i].remote = new InetSocketAddress(host, port); + threads[i].start(); + } + + int totalConn = 0; + for (int i = 0; i < threadsNum; i++) { + threads[i].join(); + totalConn += threads[i].totalConn; + } + + System.out.println("time: " + (System.currentTimeMillis() - startTime) + " conns: " + totalConn); + for (int i = 0; i < threadsNum; i++) { + System.out.println("time thread" + i + ": " + threads[i].duration); + } + + } + + public void runNio2Test() throws Exception { + System.out.println("Nio2"); + + long startTime = System.currentTimeMillis(); + + Nio2Thread[] threads = new Nio2Thread[threadsNum]; + for (int i = 0; i < threadsNum; i++) { + threads[i] = new Nio2Thread(); + threads[i].loops = loops; + threads[i].connNum = connsPerLoop; + threads[i].reuseAddress = reuseAddress; + threads[i].remote = new InetSocketAddress(host, port); + threads[i].start(); + } + + int totalConn = 0; + for (int i = 0; i < threadsNum; i++) { + threads[i].join(); + totalConn += threads[i].totalConn; + } + + System.out.println("time: " + (System.currentTimeMillis() - startTime) + " conns: " + totalConn); + for (int i = 0; i < threadsNum; i++) { + System.out.println("time thread" + i + ": " + threads[i].duration); + } + + } + + public void runNioEmitterWorker() throws Exception { + System.out.println("NioEmitterWorker"); + NioEmitterWorkerNoExecutors[] workers = new NioEmitterWorkerNoExecutors[threadsNum]; + + for (int i = 0; i < threadsNum; i++) { + workers[i] = new NioEmitterWorkerNoExecutors(i); + /*threads[i].loops = loops; + threads[i].connNum = connsPerLoop; + threads[i].reuseAddress = reuseAddress;*/ + workers[i].setReusePort(reuseAddress); + workers[i].setSessionMapper(new PassThroughSessionMapper()); + workers[i].start(); + } + + SessionInfo[] sessions = new SessionInfo[connsPerLoop]; + for (int i = 0; i < connsPerLoop; i++) { + sessions[i] = new SessionInfo( + new SocketAddress("1.1.1.1", i + 100), + new SocketAddress(host, port) + ); + } + + int totalConn = 0; + long startTime = System.currentTimeMillis(); + final List<ChannelContext> contexts = new ArrayList<>(); + for (int i = 0; i < loops; i++) { + CountDownLatch activeChannels = new CountDownLatch(connsPerLoop * threadsNum); + CountDownLatch inactiveChannels = new CountDownLatch(connsPerLoop * threadsNum); + EmitterHandler handler = new EmitterHandler() { + + @Override + public void channelActive(ChannelContext context) throws Exception { + if (context != null) { + contexts.add(context); + } else { + System.out.println("Null context."); + } + activeChannels.countDown(); + } + + @Override + public void channelInactive(ChannelContext context) throws Exception { + inactiveChannels.countDown(); + } + + @Override + public void errorOccured(ChannelContext context, Throwable cause) throws Exception { + cause.printStackTrace(); + } + + }; + + for (int j = 0; j < threadsNum; j++) { + NioEmitterWorkerNoExecutors worker = workers[j]; + for (int k = 0; k < connsPerLoop; k++) { + worker.connect(sessions[k], handler); + } + } + + activeChannels.await(); + totalConn += contexts.size(); + + for (ChannelContext context : contexts) { + if (context != null) { + try { + context.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + inactiveChannels.await(); + + contexts.clear(); + } + + System.out.println("time: " + (System.currentTimeMillis() - startTime) + " conns: " + totalConn); + for (int i = 0; i < threadsNum; i++) { + workers[i].setWorking(false); + } + } + + public class SocketThread extends Thread { + + private final Socket socket; + + private final InetSocketAddress remote; + + public SocketThread(InetSocketAddress remote) { + this.socket = new Socket(); + this.remote = remote; + } + + @Override + public void run() { + try { + socket.setReuseAddress(reuseAddress); + socket.setTcpNoDelay(true); + socket.connect(remote); + } catch (Exception e) { + e.printStackTrace(); + } + } + + } + + public static class SocketBunchThread extends Thread { + + private static final Logger LOGGER = LogManager.getLogger(SocketBunchThread.class); + + private int loops = 40; + + private int connNum = 100; + + private int totalConn = 0; + + private long duration = 0; + + boolean reuseAddress = false; + + private InetSocketAddress remote; + + @Override + public void run() { + try { + long startTime = System.currentTimeMillis(); + Socket[] sockets = new Socket[connNum]; + for (int i = 0; i < loops; i++) { + for (int j = 0; j < connNum; j++) { + LOGGER.debug("Connecting to '{}.'", remote); + Socket socket = new Socket(); + socket.setReuseAddress(reuseAddress); + socket.setTcpNoDelay(true); + socket.connect(remote); + + sockets[j] = socket; + totalConn++; + LOGGER.debug("Connected to '{}.'", remote); + } + + for (int j = 0; j < connNum; j++) { + Socket socket = sockets[j]; + LOGGER.debug("Closing session '{}<->{}.'", socket.getLocalAddress(), remote); + sockets[j].close(); + LOGGER.debug("Closed session '{}<->{}.'", socket.getLocalAddress(), remote); + } + } + + duration = System.currentTimeMillis() - startTime; + } catch (Exception e) { + e.printStackTrace(); + } + + } + + } + + public static class NioThread extends Thread { + + private static final Logger LOGGER = LogManager.getLogger(NioThread.class); + + private int loops = 40; + + private int connNum = 100; + + private int totalConn = 0; + + private long duration = 0; + + boolean reuseAddress = false; + + private InetSocketAddress remote; + + @Override + public void run() { + try { + Selector selector = Selector.open(); + + long startTime = System.currentTimeMillis(); + SocketChannel[] channels = new SocketChannel[connNum]; + for (int i = 0; i < loops; i++) { + for (int j = 0; j < connNum; j++) { + LOGGER.debug("Registering", remote); + SocketChannel channel = SocketChannel.open(); + channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress); + channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + channel.configureBlocking(false); + + channel.register(selector, SelectionKey.OP_CONNECT); + try { + channel.connect(remote); + } catch (Exception ex) { + ex.printStackTrace(); + } + + channels[j] = channel; + LOGGER.debug("Registered", remote); + } + + int establishedConn = 0; + int selected = 0; + long selectTimeout = 100; + for (;;) { + try { + selected = selector.select(selectTimeout); + } catch (IOException ex) { + LOGGER.warn(ex.getMessage(), ex); + } + + if (selected > 0) { + Iterator<SelectionKey> it = selector.selectedKeys().iterator(); + while (it.hasNext()) { + SelectionKey key = it.next(); + it.remove(); + + if (!key.isValid()) { + continue; + } + + if (key.isConnectable()) { + LOGGER.debug("Connecting", remote); + SocketChannel channel = (SocketChannel) key.channel(); + + while (!channel.finishConnect()) { + + } + + key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); + establishedConn++; + LOGGER.debug("Connected", remote); + totalConn++; + //System.out.println(establishedConn); + } + } + } + + if (establishedConn == connNum) { + LOGGER.debug("All connetions established.", remote); + break; + } + } + + for (int j = 0; j < connNum; j++) { + SocketChannel channel = channels[j]; + LOGGER.debug("Closing session."); + channel.close(); + LOGGER.debug("Closed session."); + } + } + + duration = System.currentTimeMillis() - startTime; + } catch (Exception e) { + e.printStackTrace(); + } + + } + + } + + public static class Nio2Thread extends Thread { + + private static final Logger LOGGER = LogManager.getLogger(Nio2Thread.class); + + private int loops = 40; + + private int connNum = 100; + + private int totalConn = 0; + + private long duration = 0; + + boolean reuseAddress = false; + + private InetSocketAddress remote; + + private Selector selector; + + private SelectorThread selectorThread; + + private final AtomicBoolean wakenUp = new AtomicBoolean(); + + private final Object lock = new Object(); + private final Object selectorLock1 = new Object(); + private final Object selectorLock2 = new Object(); + + private volatile boolean allConns = false; + + public Nio2Thread() { + try { + selector = Selector.open(); + selectorThread = new SelectorThread(selector); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + private void wakeUp() { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + } + + @Override + public void run() { + try { + selectorThread.start(); + + long startTime = System.currentTimeMillis(); + SocketChannel[] channels = new SocketChannel[connNum]; + for (int i = 0; i < loops; i++) { + allConns = false; + for (int j = 0; j < connNum; j++) { + LOGGER.debug("SocketChannel.open"); + SocketChannel channel = SocketChannel.open(); + channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress); + channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + channel.configureBlocking(false); + channels[j] = channel; + + LOGGER.debug("Registering"); + synchronized (selectorLock2) { + wakeUp(); + synchronized (selectorLock1) { + channel.register(selector, SelectionKey.OP_CONNECT); + } + try { + LOGGER.debug("channel.connect"); + channel.connect(remote); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + + LOGGER.debug("Registered", remote); + } + + while (!allConns) { + synchronized (lock) { + lock.wait(50); + } + } + + for (int j = 0; j < connNum; j++) { + SocketChannel channel = channels[j]; + LOGGER.debug("Closing session."); + channel.close(); + LOGGER.debug("Closed session."); + } + + } + + selectorThread.working = false; + selectorThread.selector.wakeup(); + selectorThread.join(); + duration = System.currentTimeMillis() - startTime; + } catch (Exception e) { + e.printStackTrace(); + } + + } + + private class SelectorThread extends Thread { + + private final Selector selector; + + private volatile boolean working = true; + + public SelectorThread(Selector selector) { + this.selector = selector; + } + + @Override + public void run() { + try { + int establishedConn = 0; + int selected = 0; + long selectTimeout = 10; + while (working) { + synchronized (selectorLock1) { + try { + wakenUp.set(false); + selected = selector.select(selectTimeout); + } catch (IOException ex) { + LOGGER.warn(ex.getMessage(), ex); + } + } + + synchronized (selectorLock2) { + if (selected > 0) { + Iterator<SelectionKey> it = selector.selectedKeys().iterator(); + while (it.hasNext()) { + SelectionKey key = it.next(); + it.remove(); + + if (!key.isValid()) { + continue; + } + + if (key.isConnectable()) { + LOGGER.debug("Connecting"); + SocketChannel channel = (SocketChannel) key.channel(); + + try { + while (!channel.finishConnect()) { + + } + + key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); + establishedConn++; + LOGGER.debug("Connected"); + totalConn++; + } catch (Exception e) { + key.cancel(); + e.printStackTrace(); + } + + //System.out.println(establishedConn); + } + } + } + } + + if (establishedConn >= connNum) { + LOGGER.debug("All connections established.", remote); + synchronized (lock) { + allConns = true; + lock.notifyAll(); + } + + establishedConn = 0; + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } +}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java Mon Oct 09 10:02:28 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java Fri Oct 13 07:34:51 2017 +0200 @@ -28,9 +28,9 @@ public static final String TYPE = "asynch"; - private long waitTimeout = 100; + private long waitTimeout = 10; - private long windowPeriod = 10; + private long windowPeriod = 1_000; private final Queue<TasksTimeWindow> windows = new ConcurrentLinkedQueue(); @@ -76,6 +76,15 @@ } @Override + public void close() { + super.close(); + + synchronized (lock) { + lock.notifyAll(); + } + } + + @Override protected void closeAllConnections() { if (logger.isDebugEnabled()) { logger.debug("Closing all connections."); @@ -85,8 +94,9 @@ boolean wait; do { wait = false; + //processTimeouts(true); for (HttpFlowContext flowContext : sessions.values()) { - if (flowContext.state == HttpFlowContext.STATE_REQ_SENT) { + if (flowContext.state < HttpFlowContext.STATE_DISCONNECTED) { wait = true; break; } @@ -180,7 +190,7 @@ private void addEvent(Event event, TasksTimeWindow window) { switch (event.getType()) { - case HttpSessionPayloadEvent.TYPE: { + /*case HttpSessionPayloadEvent.TYPE: { Event newEvent = eventInstanceForWorker(event); long time = newEvent.getTimestamp(); HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) newEvent; @@ -199,7 +209,7 @@ } break; - } + }*/ case SessionStatusEvent.TYPE: { Event newEvent = eventInstanceForWorker(event); SessionStatusEvent statusEvent = (SessionStatusEvent) newEvent; @@ -222,6 +232,7 @@ @Override public void handle(Event event) { + //System.out.println(event.getTimestamp()); synchronized (readerLock) { while (processWindow) { try { @@ -232,16 +243,16 @@ } } + TasksTimeWindow window = getWindow(event.getTimestamp(), true); + if (currentWindow == null) { + currentWindow = windows.peek(); + } else if (window != currentWindow) { + currentWindow = windows.peek(); + processWindow = true; + } + + addEvent(event, window); synchronized (lock) { - TasksTimeWindow window = getWindow(event.getTimestamp(), true); - if (currentWindow == null) { - currentWindow = window; - } else if (window != currentWindow) { - currentWindow = windows.peek(); - processWindow = true; - } - - addEvent(event, window); if (processWindow) { lock.notifyAll(); } @@ -258,28 +269,44 @@ if (flowContext == null) { connect(statusEvent); } else { - switch (flowContext.state()) { - case HttpFlowContext.STATE_RESP_RECEIVED: - case HttpFlowContext.STATE_CONNECTED: - case HttpFlowContext.STATE_ERROR: - close(statusEvent); - return false; - case HttpFlowContext.STATE_DISCONNECTED: - connect(statusEvent); + synchronized (flowContext) { + if (processTimeout(flowContext)) { return true; - default: - return false; + } + + switch (flowContext.state()) { + case HttpFlowContext.STATE_RESP_RECEIVED: + case HttpFlowContext.STATE_CONNECTED: + case HttpFlowContext.STATE_ERROR: + closeSession(statusEvent); + return false; + case HttpFlowContext.STATE_DISCONNECTED: + connect(statusEvent); + return true; + default: + return false; + } } } } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { HttpFlowContext flowContext = flowContext(statusEvent); if (flowContext == null) { return true; - } else if (flowContext.state != HttpFlowContext.STATE_REQ_SENT) { - close(statusEvent); - return true; - } else { - return false; + } + + synchronized (flowContext) { + if (processTimeout(flowContext)) { + return true; + } + + if (flowContext.state != HttpFlowContext.STATE_REQ_SENT + && flowContext.state != HttpFlowContext.STATE_CONNECTING + && flowContext.state != HttpFlowContext.STATE_DISCONNECTING) { + closeSession(statusEvent); + return true; + } else { + return false; + } } } @@ -289,26 +316,32 @@ SessionEvent sessEvent = (SessionEvent) event; HttpFlowContext flowContext = flowContext(sessEvent); if (flowContext != null) { - switch (flowContext.state) { - case HttpFlowContext.STATE_CONNECTING: - case HttpFlowContext.STATE_REQ_SENT: - return false; - case HttpFlowContext.STATE_CONNECTED: - case HttpFlowContext.STATE_RESP_RECEIVED: - case HttpFlowContext.STATE_ERROR: - if (send(flowContext, (HttpSessionPayloadEvent) event)) { - return true; - } + synchronized (flowContext) { + if (processTimeout(flowContext)) { + return true; + } - return false; - case HttpFlowContext.STATE_DISCONNECTING: - case HttpFlowContext.STATE_DISCONNECTED: - if (connectPartialSession) { - connect(sessEvent); - } else { - return true; - } - break; + switch (flowContext.state) { + case HttpFlowContext.STATE_CONNECTING: + case HttpFlowContext.STATE_REQ_SENT: + return false; + case HttpFlowContext.STATE_CONNECTED: + case HttpFlowContext.STATE_RESP_RECEIVED: + case HttpFlowContext.STATE_ERROR: + if (send(flowContext, (HttpSessionPayloadEvent) event)) { + return true; + } + + return false; + case HttpFlowContext.STATE_DISCONNECTING: + case HttpFlowContext.STATE_DISCONNECTED: + if (connectPartialSession) { + connect(sessEvent); + } else { + return true; + } + break; + } } } else if (connectPartialSession) { connect(sessEvent); @@ -320,8 +353,14 @@ SessionEvent sessEvent = (SessionEvent) event; HttpFlowContext flowContext = flowContext(sessEvent); if (flowContext != null) { - return (flowContext.state == HttpFlowContext.STATE_RESP_RECEIVED - || flowContext.state >= HttpFlowContext.STATE_DISCONNECTING); + if (processTimeout(flowContext)) { + return true; + } + + synchronized (flowContext) { + return (flowContext.state == HttpFlowContext.STATE_RESP_RECEIVED + || flowContext.state >= HttpFlowContext.STATE_DISCONNECTING); + } } return true; @@ -333,18 +372,31 @@ @Override public void run() { - - synchronized (lock) { - working = true; - while (working) { + working = true; + while (working) { + synchronized (lock) { try { lock.wait(); + } catch (InterruptedException ignore) { + } + } - boolean dataLoopEnd = false; - boolean dataEnd = false; + if (processWindow) { + long startTime = System.currentTimeMillis(); + int tasksNum = -1; + boolean dataLoopEnd = false; + boolean dataEnd = false; + + try { + + logger.info("processing: " + currentWindow); for (;;) { Iterator<Task> it = currentWindow.tasks.iterator(); + if (tasksNum == -1) { + tasksNum = currentWindow.tasks.size(); + } + while (it.hasNext()) { Task task = it.next(); switch (task.type()) { @@ -385,11 +437,12 @@ break; } else if (!flowStateChanged) { flowStateChanged = false; - try { - lock.wait(waitTimeout); - } catch (InterruptedException ignore) { + synchronized (lock) { + try { + lock.wait(waitTimeout); + } catch (InterruptedException ignore) { + } } - } else { flowStateChanged = false; } @@ -399,19 +452,27 @@ closeAllConnections(); } - if (dataEnd) { - working = false; - } } catch (Exception e) { if (logger.isDebugEnabled()) { logger.debug(e.getMessage(), e); } } finally { + long endTime = System.currentTimeMillis() - startTime; + logger.info("{} tasks processed in {} ms.", tasksNum, endTime); + + //System.out.println("remove window: " + currentWindow); removeWindow(currentWindow); + + if (dataEnd) { + working = false; + } + + currentWindow = null; processWindow = false; synchronized (readerLock) { readerLock.notifyAll(); } + } } } @@ -559,13 +620,6 @@ } @Override - public String toString() { - return "TimeWindow{" - + "startTimestamp=" + startTime - + ", endTimestamp=" + endTime + '}'; - } - - @Override public int hashCode() { int hash = 5; hash = 17 * hash + (int) (this.startTime ^ (this.startTime >>> 32)); @@ -586,5 +640,13 @@ && this.endTime == other.endTime; } + @Override + public String toString() { + return "TimeWindow{" + + "startTimestamp=" + startTime + + ", endTimestamp=" + endTime + + ", tasks=" + tasks.size() + '}'; + } + } }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java Mon Oct 09 10:02:28 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java Fri Oct 13 07:34:51 2017 +0200 @@ -42,10 +42,10 @@ static { Map<Integer, Long> defaultTimeouts = new HashMap<>(); defaultTimeouts.put(HttpFlowContext.STATE_CONNECTING, 10_000L); - defaultTimeouts.put(HttpFlowContext.STATE_CONNECTED, 60_000L); - defaultTimeouts.put(HttpFlowContext.STATE_REQ_SENT, 30_000L); - defaultTimeouts.put(HttpFlowContext.STATE_RESP_RECEIVED, 60_000L); - defaultTimeouts.put(HttpFlowContext.STATE_ERROR, 60_000L); + defaultTimeouts.put(HttpFlowContext.STATE_CONNECTED, 10_000L); + defaultTimeouts.put(HttpFlowContext.STATE_REQ_SENT, 10_000L); + defaultTimeouts.put(HttpFlowContext.STATE_RESP_RECEIVED, 10_000L); + defaultTimeouts.put(HttpFlowContext.STATE_ERROR, 1_000L); defaultTimeouts.put(HttpFlowContext.STATE_DISCONNECTING, 2_000L); defaultTimeouts.put(HttpFlowContext.STATE_DISCONNECTED, 0L); DEFAULT_TIMEOUTS = Collections.unmodifiableMap(defaultTimeouts); @@ -251,18 +251,22 @@ } protected HttpFlowContext connect(SessionInfo session) { - synchronized (lock) { - try { - HttpFlowContext flowContext = register(session); - if (flowContext != null) { + try { + HttpFlowContext flowContext = register(session); + if (flowContext != null) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Connecting."); + } + + synchronized (flowContext) { emitter.connect(session, this, index); return flowContext; } - } catch (Exception e) { - logger.error(e.getMessage(), e); } - return null; + } catch (Exception e) { + logger.error(e.getMessage(), e); } + return null; } @Override @@ -271,7 +275,7 @@ for (Map.Entry<SessionInfo, HttpFlowContext> entry : sessions.entrySet()) { HttpFlowContext flowContext = entry.getValue(); try { - closeSession(flowContext); + HttpFlowBasedClientWorker.this.closeSession(flowContext); } catch (Exception e) { if (logger.isDebugEnabled()) { debug(flowContext, e.getMessage(), e); @@ -284,53 +288,41 @@ } } - protected void close(SessionEvent sessionEvent) { + @Override + public void close(SessionInfo session) { + /*try { + HttpFlowContext flowContext = flowContext(session); + if (flowContext != null) { + synchronized (flowContext) { + closeSession(flowContext); + } + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + }*/ + } + + protected void closeSession(SessionEvent sessionEvent) { close(sessionEvent.getSessionInfo()); } - protected void close(HttpFlowContext flowContext) { - synchronized (lock) { - try { - closeSession(flowContext); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } - } - } - - @Override - public void close(SessionInfo session) { - synchronized (lock) { - try { - HttpFlowContext flowContext = flowContext(session); - closeSession(flowContext); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } - } - } - protected void closeSession(HttpFlowContext flowContext) { - synchronized (lock) { - if (flowContext != null) { + if (flowContext != null) { + synchronized (flowContext) { changeFlowState(flowContext, HttpFlowContext.STATE_DISCONNECTING); } } } protected void removeFlowContext(HttpFlowContext flowContext) { - synchronized (lock) { - debug(flowContext, "removeFlowContext"); - sessions.remove(flowContext.sessionInfo()); - } + debug(flowContext, "removeFlowContext"); + sessions.remove(flowContext.sessionInfo()); } protected void reconnect(HttpFlowContext flowContext) { - synchronized (lock) { + synchronized (flowContext) { try { if (logger.isDebugEnabled()) { debug(flowContext, "Reconnect (state: {}).", flowContext.stateString()); @@ -346,10 +338,8 @@ } protected void closeAllConnections() { - synchronized (lock) { - for (HttpFlowContext flowContext : sessions.values()) { - closeSession(flowContext); - } + for (HttpFlowContext flowContext : sessions.values()) { + HttpFlowBasedClientWorker.this.closeSession(flowContext); } } @@ -387,44 +377,54 @@ @Override public void channelActive(ChannelContext context) throws Exception { - synchronized (lock) { - HttpFlowContext flowContext = flowContext(context); - if (flowContext != null) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})", - context.getLocalAddress(), - context.getRemoteAddress()); - } + HttpFlowContext flowContext = flowContext(context); + if (flowContext != null) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})", + context.getLocalAddress(), + context.getRemoteAddress()); + } + synchronized (flowContext) { flowContext.channelContext = context; changeFlowState(flowContext, STATE_CONNECTED); } + } + synchronized (lock) { lock.notifyAll(); } } @Override public void channelInactive(ChannelContext context) throws Exception { - synchronized (lock) { - HttpFlowContext flowContext = flowContext(context); - if (flowContext != null) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Channel inactive."); - } + HttpFlowContext flowContext = flowContext(context); + if (flowContext != null) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Channel inactive."); + } + synchronized (flowContext) { changeFlowState(flowContext, STATE_DISCONNECTED); } + } + + synchronized (lock) { lock.notifyAll(); } } @Override public void dataReceived(ChannelContext context, ByteBuff data) throws Exception { - synchronized (lock) { - HttpFlowContext flowContext = flowContext(context); - try { - if (flowContext != null) { + HttpFlowContext flowContext = flowContext(context); + try { + if (flowContext != null) { + synchronized (flowContext) { + if (flowContext.state() >= HttpFlowContext.STATE_DISCONNECTING) { + debug(flowContext, "Data received but flow context in state '{}'.", flowContext.stateString()); + return; + } + HttpFullMessageDecoder decoder = flowContext.decoder; HttpRequest req = null; if (flowContext.sentEvent != null) { @@ -496,43 +496,51 @@ changeFlowState(flowContext, HttpFlowContext.STATE_RESP_RECEIVED); } } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - debug(flowContext, e.getMessage(), e); - } } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + debug(flowContext, e.getMessage(), e); + } + } + synchronized (lock) { lock.notifyAll(); } } @Override public void dataWriteStart(ChannelContext context) { - synchronized (lock) { - HttpFlowContext flowContext = flowContext(context); - if (flowContext != null && flowContext.sentEvent != null) { - long now = timeGenerator.currentTimeMillis(); + HttpFlowContext flowContext = flowContext(context); + if (flowContext != null && flowContext.sentEvent != null) { + long now = timeGenerator.currentTimeMillis(); + synchronized (flowContext) { flowContext.sendStartTimestamp = now; flowContext.sentEvent.getRequest().setTag(TAG_TIME_START, now); } } + } @Override public void dataWritten(ChannelContext context) throws Exception { + HttpFlowContext flowContext = flowContext(context); + + if (flowContext != null) { + synchronized (flowContext) { + if (flowContext.sentEvent != null) { + long now = timeGenerator.currentTimeMillis(); + if (collectMetric) { + synchronized (metric) { + metric.addRequestSendingTime(now - flowContext.sendStartTimestamp); + } + } + + flowContext.sentEvent.getRequest().setTag(TAG_TIME_END, now); + } + } + } + synchronized (lock) { - HttpFlowContext flowContext = flowContext(context); - if (flowContext != null && flowContext.sentEvent != null) { - long now = timeGenerator.currentTimeMillis(); - if (collectMetric) { - synchronized (metric) { - metric.addRequestSendingTime(now - flowContext.sendStartTimestamp); - } - } - - flowContext.sentEvent.getRequest().setTag(TAG_TIME_END, now); - } - lock.notifyAll(); } @@ -544,99 +552,123 @@ logger.debug("Error occured. " + cause.getMessage(), cause); } + HttpFlowContext flowContext = flowContext(context); + if (flowContext != null) { + synchronized (flowContext) { + closeSession(flowContext); + } + } + synchronized (lock) { - HttpFlowContext flowContext = flowContext(context); - if (flowContext != null) { - changeFlowState(flowContext, HttpFlowContext.STATE_ERROR); - } - lock.notifyAll(); } } - protected boolean send(HttpFlowContext context, HttpSessionPayloadEvent event) { - synchronized (lock) { + protected boolean send(HttpFlowContext flowContext, HttpSessionPayloadEvent event) { + int reqSize = -1; + boolean res = false; + synchronized (flowContext) { + if (flowContext.state() < HttpFlowContext.STATE_CONNECTED + || flowContext.state() >= HttpFlowContext.STATE_DISCONNECTING) { + debug(flowContext, "Data sending attempt but flow context in state '{}'.", flowContext.stateString()); + return false; + } + if (event.getRequest() != null) { HttpRequest req = event.getRequest(); - if (filterChain.filterOutbound(req, event.getResponse(), context) == HttpFilter.DENY) { + if (filterChain.filterOutbound(req, event.getResponse(), flowContext) == HttpFilter.DENY) { return false; } - reqEncoder.encodeHeader(req, context.buffer); - long headerSize = context.buffer.readableBytes(); - reqEncoder.encodeContent(req, context.buffer); + reqEncoder.encodeHeader(req, flowContext.buffer); + long headerSize = flowContext.buffer.readableBytes(); + reqEncoder.encodeContent(req, flowContext.buffer); req.setTag(TAG_HEADER_SIZE, headerSize); - req.setTag(TAG_CONTENT_SIZE, (long) (context.buffer.readableBytes() - headerSize)); + req.setTag(TAG_CONTENT_SIZE, (long) (flowContext.buffer.readableBytes() - headerSize)); - if (collectMetric) { - synchronized (metric) { - metric.incRequestsNum(); - metric.addRequestSize(context.buffer.readableBytes()); - //metric.addRequestUrl(req.getUrl()); - } - } + reqSize = flowContext.buffer.readableBytes(); try { - changeFlowState(context, HttpFlowContext.STATE_REQ_SENT); - context.sentEvent = event; + changeFlowState(flowContext, HttpFlowContext.STATE_REQ_SENT); + flowContext.sentEvent = event; - context.channelContext.writeAndFlush(context.buffer); + flowContext.channelContext.writeAndFlush(flowContext.buffer); if (logger.isDebugEnabled()) { - debug(context, "Request '{}' sending ({} bytes).", req.getUrl(), context.buffer.length()); + debug(flowContext, "Request '{}' sending ({} bytes).", req.getUrl(), flowContext.buffer.length()); } - context.buffer.clear(); + flowContext.buffer.clear(); - return true; + res = true; } catch (Exception e) { if (logger.isDebugEnabled()) { - debug(context, e.getMessage(), e); + debug(flowContext, e.getMessage(), e); } } } } - return false; + if (collectMetric && reqSize >= 0) { + synchronized (metric) { + metric.incRequestsNum(); + metric.addRequestSize(reqSize); + //metric.addRequestUrl(req.getUrl()); + } + } + + return res; } protected void processTimeouts() { - synchronized (lock) { - try { - long now = timeGenerator.currentTimeMillis(); - if (nextCheckTimeoutsTime == -1) { - nextCheckTimeoutsTime = now + checkTimeoutsPeriod; - } else if (nextCheckTimeoutsTime > now) { - nextCheckTimeoutsTime = now + checkTimeoutsPeriod; - for (HttpFlowContext flowContext : sessions.values()) { - if (flowContext.timeouted()) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Flow for session '{}' timed out (state '{}').", - flowContext.sessionInfo(), flowContext.stateString()); - } + processTimeouts(false); + } - switch (flowContext.state) { - case HttpFlowContext.STATE_CONNECTING: - case HttpFlowContext.STATE_CONNECTED: - case HttpFlowContext.STATE_REQ_SENT: - case HttpFlowContext.STATE_ERROR: - closeSession(flowContext); - break; - case HttpFlowContext.STATE_RESP_RECEIVED: - //Dziwny blad nie powinien wystepowac - break; - case HttpFlowContext.STATE_DISCONNECTING: - case HttpFlowContext.STATE_DISCONNECTED: - removeFlowContext(flowContext); - break; - } - } + protected boolean processTimeout(HttpFlowContext flowContext) { + if (flowContext.timeouted()) { + info(flowContext, "Session timed out (state '{}').", + flowContext.sessionInfo(), flowContext.stateString()); + if (logger.isDebugEnabled()) { + debug(flowContext, "Session timed out (state '{}').", + flowContext.sessionInfo(), flowContext.stateString()); + } + + switch (flowContext.state) { + case HttpFlowContext.STATE_CONNECTING: + case HttpFlowContext.STATE_CONNECTED: + case HttpFlowContext.STATE_REQ_SENT: + case HttpFlowContext.STATE_ERROR: + case HttpFlowContext.STATE_RESP_RECEIVED: + closeSession(flowContext); + break; + case HttpFlowContext.STATE_DISCONNECTING: + case HttpFlowContext.STATE_DISCONNECTED: + removeFlowContext(flowContext); + break; + } + + return true; + } + + return false; + } + + protected void processTimeouts(boolean force) { + try { + long now = timeGenerator.currentTimeMillis(); + if (nextCheckTimeoutsTime == -1 && !force) { + nextCheckTimeoutsTime = now + checkTimeoutsPeriod; + } else if (nextCheckTimeoutsTime > now || force) { + nextCheckTimeoutsTime = now + checkTimeoutsPeriod; + for (HttpFlowContext flowContext : sessions.values()) { + synchronized (flowContext) { + processTimeout(flowContext); } } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); } } } @@ -659,6 +691,10 @@ log(flowContext, Level.DEBUG, message, cause); } + protected final void info(HttpFlowContext flowContext, String message, Object... args) { + log(flowContext, Level.INFO, message, args); + } + protected final void error(HttpFlowContext flowContext, String message, Throwable cause) { log(flowContext, Level.ERROR, message, cause); }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java Mon Oct 09 10:02:28 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpParallelClientWorker.java Fri Oct 13 07:34:51 2017 +0200 @@ -122,7 +122,7 @@ LocalHttpFlowContext indexFlowContext = it.next(); if (indexFlowContext.eventsQueue.isEmpty() && indexFlowContext.state != HttpFlowContext.STATE_REQ_SENT) { - close(flowContext); + closeSession(flowContext); if (--diff == 0) { break; } @@ -154,7 +154,7 @@ if (localFlowContext.state < HttpFlowContext.STATE_DISCONNECTING && localFlowContext.state != HttpFlowContext.STATE_REQ_SENT && localFlowContext.eventsQueue.isEmpty()) { - close(flowContext); + closeSession(flowContext); return; } } @@ -169,7 +169,7 @@ SessionStatusEvent statusEvent = (SessionStatusEvent) event; if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { localFlowContext.eventsQueue.poll(); - close((SessionStatusEvent) event); + closeSession((SessionStatusEvent) event); } } else if (event.getType() == HttpSessionPayloadEvent.TYPE && canSend(flowContext)) { @@ -203,7 +203,7 @@ if (flowContext != null) { if (flowContext.eventsQueue.isEmpty() && flowContext.state != HttpFlowContext.STATE_REQ_SENT) { - close(statusEvent); + closeSession(statusEvent); } else { addToQueue(flowContext, event); }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java Mon Oct 09 10:02:28 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java Fri Oct 13 07:34:51 2017 +0200 @@ -160,7 +160,7 @@ currFlowContext = flowContext((SessionEvent) event); if (currFlowContext != null) { if (currFlowContext.state != HttpFlowContext.STATE_REQ_SENT) { - close(statusEvent); + closeSession(statusEvent); } } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Mon Oct 09 10:02:28 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Fri Oct 13 07:34:51 2017 +0200 @@ -67,7 +67,7 @@ this.srcIp = srcIp; this.dstIp = dstIp; this.sessionStatus = 0; - this.hashCode = (srcPort + dstPort << 16) ^ srcIp.hashCode() ^ dstIp.hashCode(); + this.hashCode = (srcPort + (dstPort << 16)) ^ srcIp.hashCode() ^ dstIp.hashCode(); } public String getSourceName() {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContextNoExecutors.java Fri Oct 13 07:34:51 2017 +0200 @@ -0,0 +1,108 @@ +package com.passus.st.emitter.nio; + +import com.passus.data.ByteBuff; +import com.passus.net.SocketAddress; +import com.passus.net.utils.AddressUtils; +import com.passus.st.emitter.ChannelContext; +import com.passus.st.emitter.SessionInfo; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.LinkedList; +import java.util.Queue; + +/** + * + * @author Mirosław Hawrot + */ +public class NioChannelContextNoExecutors implements ChannelContext { + + private final NioEmitterWorkerNoExecutors worker; + + private final SessionInfo sessionInfo; + + private final SocketChannel channel; + + private final Queue<ByteBuffer> dataQueue; + + private SocketAddress localAddress; + + private SocketAddress remoteAddress; + + private SelectionKey key; + + public NioChannelContextNoExecutors(NioEmitterWorkerNoExecutors worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) { + this.worker = worker; + this.channel = channel; + this.remoteAddress = remoteAddress; + this.sessionInfo = sessionInfo; + this.dataQueue = new LinkedList<>(); + + } + + Queue<ByteBuffer> dataQueue() { + return dataQueue; + } + + void selectionKey(SelectionKey key) { + this.key = key; + } + + private void addToQeueu(ByteBuffer buffer) throws IOException { + dataQueue.add(buffer); + } + + @Override + public boolean isConnected() { + return channel.isConnected(); + } + + @Override + public boolean isConnectionPending() { + return channel.isConnectionPending(); + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + addToQeueu(ByteBuffer.wrap(data, offset, length)); + } + + @Override + public void write(ByteBuff data) throws IOException { + addToQeueu(data.toNioByteBuffer()); + } + + @Override + public void flush() { + worker.flush(key); + } + + @Override + public void close() throws IOException { + worker.requestClose(key); + } + + @Override + public SocketAddress getLocalAddress() { + try { + if (localAddress == null && channel.getLocalAddress() != null) { + localAddress = AddressUtils.jdkSocketToSocketAddress(channel.getLocalAddress()); + } + } catch (Exception e) { + } + + return localAddress; + } + + @Override + public SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public SessionInfo getSessionInfo() { + return sessionInfo; + } + +}
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java Mon Oct 09 10:02:28 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java Fri Oct 13 07:34:51 2017 +0200 @@ -55,7 +55,9 @@ private EmitterMetric metric; - NioEmitterWorker(int index) throws IOException { + private ExecutorService exec = Executors.newSingleThreadExecutor(); + + public NioEmitterWorker(int index) throws IOException { super("NioEmitterWorker-" + index); selector = Selector.open(); } @@ -89,17 +91,6 @@ this.sessionMapper = sessionMapper; } - public void setWorking(boolean working) { - this.working = working; - if (this.working && !working) { - try { - selector.close(); - } catch (IOException ex) { - LOGGER.warn(ex.getMessage(), ex); - } - } - } - @Override public boolean isCollectMetrics() { return collectMetrics; @@ -135,6 +126,34 @@ return working; } + public void setWorking(boolean working) { + if (this.working && !working) { + this.working = false; + try { + selector.close(); + } catch (IOException ex) { + LOGGER.warn(ex.getMessage(), ex); + } + + if (!tasks.isEmpty()) { + LOGGER.debug("{} tasks remains.", tasks.size()); + } + + tasks.clear(); + if (exec != null) { + try { + exec.shutdownNow(); + exec.awaitTermination(200, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Shutdowned."); + } + } + } + @Override public void writeMetrics(MetricsContainer container) { if (collectMetrics) { @@ -268,10 +287,18 @@ LOGGER.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress()); } - keyContext.handler.channelActive(keyContext.channelContext); + exec.submit(() -> { + try { + keyContext.handler.channelActive(keyContext.channelContext); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + + }); + setOpRead(key); } catch (Exception ex) { - + LOGGER.debug(ex.getMessage(), ex); } } @@ -283,7 +310,10 @@ keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); } - keyContext.handler.dataWriteStart(keyContext.channelContext); + exec.submit(() -> { + keyContext.handler.dataWriteStart(keyContext.channelContext); + }); + Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue(); int written = 0; try { @@ -295,17 +325,7 @@ if (res == -1) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Session ({} -> {}) closed by serwer.", - keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Session ({} -> {}) closed by serwer.", - keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Session ({} -> {}) closed by serwer.", + LOGGER.debug("Session ({} -> {}) closed by server.", keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); } @@ -332,12 +352,14 @@ } //TODO Operacje na handlerach powinny przechodzic przez Executor - try { - keyContext.handler.dataWritten(keyContext.channelContext); - LOGGER.debug("Write handled."); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } + exec.submit(() -> { + try { + keyContext.handler.dataWritten(keyContext.channelContext); + LOGGER.debug("Write handled."); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + }); setOpRead(key); clearOpWrite(key); @@ -382,27 +404,28 @@ } if (totalReaded > 0) { - try { - keyContext.handler.dataReceived(keyContext.channelContext, buff); - LOGGER.debug("Read handled."); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } + exec.submit(() -> { + try { + keyContext.handler.dataReceived(keyContext.channelContext, buff); + LOGGER.debug("Read handled."); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + }); } if (readed == -1) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Session ({} -> {}) closed by serwer.", + LOGGER.debug("Session ({} -> {}) closed by server.", keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); } - } - if (readed == -1) { doClose(key); return; } keyContext.buffer.flip(); + key.selector().wakeup(); } private void doCatchException(SelectionKey key, Throwable cause) { @@ -417,43 +440,45 @@ } KeyContext keyContext = (KeyContext) key.attachment(); - - try { - keyContext.handler.errorOccured(keyContext.channelContext, cause); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } + exec.submit(() -> { + try { + keyContext.handler.errorOccured(keyContext.channelContext, cause); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + }); } private void doClose(SelectionKey key) { - if (!key.channel().isOpen()) { - selector.wakeup(); - return; - } - - try { - key.channel().close(); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); + KeyContext keyContext = (KeyContext) key.attachment(); + if (key.channel().isOpen()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'."); + } + try { + key.channel().close(); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } } - KeyContext keyContext = (KeyContext) key.attachment(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'."); + if (key.isValid()) { + key.cancel(); } - try { - keyContext.handler.channelInactive(keyContext.channelContext); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } + exec.submit(() -> { + try { + keyContext.handler.channelInactive(keyContext.channelContext); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } - key.cancel(); - try { - keyContext.handler.channelUnregistered(keyContext.channelContext); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } + try { + keyContext.handler.channelUnregistered(keyContext.channelContext); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + }); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'.");
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerNoExecutors.java Fri Oct 13 07:34:51 2017 +0200 @@ -0,0 +1,770 @@ +package com.passus.st.emitter.nio; + +import com.passus.st.emitter.SessionMapper; +import com.passus.commons.Assert; +import com.passus.data.ByteBuff; +import com.passus.data.HeapByteBuff; +import com.passus.net.SocketAddress; +import com.passus.net.utils.AddressUtils; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.EmitterMetric; +import static com.passus.st.emitter.SessionMapper.ANY_SOCKET; +import com.passus.st.emitter.SessionMapper.ConnectionParams; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.metric.MetricSource; +import com.passus.st.metric.MetricsContainer; +import java.io.IOException; +import java.net.ConnectException; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * + * @author Mirosław Hawrot + */ +public class NioEmitterWorkerNoExecutors extends Thread implements MetricSource { + + private static final Logger LOGGER = LogManager.getLogger(NioEmitterWorker.class); + + private int index; + + private final Selector selector; + + private volatile boolean working = false; + + private long selectTimeout = 100; + + private Queue<Task> tasks = new ConcurrentLinkedQueue<>(); + + private SessionMapper sessionMapper; + + private volatile boolean collectMetrics = false; + + private long connectionTimeout = 5_000; + + private EmitterMetric metric; + + private boolean reusePort = true; + + private boolean tcpNoDelay = true; + + private final AtomicBoolean wakeUp = new AtomicBoolean(); + + public NioEmitterWorkerNoExecutors(int index) throws IOException { + super("NioEmitterWorker-" + index); + selector = Selector.open(); + } + + int getIndex() { + return index; + } + + public boolean isReusePort() { + return reusePort; + } + + public void setReusePort(boolean reusePort) { + this.reusePort = reusePort; + } + + public boolean isTcpNoDelay() { + return tcpNoDelay; + } + + public void setTcpNoDelay(boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + } + + public long getSelectTimeout() { + return selectTimeout; + } + + public void setSelectTimeout(long selectTimeout) { + Assert.greaterThanZero(selectTimeout, "selectTimeout"); + this.selectTimeout = selectTimeout; + } + + public long getConnectionTimeout() { + return connectionTimeout; + } + + public void setConnectionTimeout(long connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + public SessionMapper getSessionMapper() { + return sessionMapper; + } + + public void setSessionMapper(SessionMapper sessionMapper) { + this.sessionMapper = sessionMapper; + } + + @Override + public boolean isCollectMetrics() { + return collectMetrics; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + if (collectMetrics && metric == null) { + metric = new EmitterMetric(); + try { + metric.activate(); + } catch (Exception e) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Error occured during metric activation. " + e.getMessage(), e); + } + } + this.collectMetrics = true; + } else if (!collectMetrics && metric != null) { + this.collectMetrics = false; + try { + metric.deactivate(); + } catch (Exception e) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Error occured during metric deactivation. " + e.getMessage(), e); + } + } + + metric = null; + } + } + + public boolean isWorking() { + return working; + } + + public void setWorking(boolean working) { + if (this.working && !working) { + this.working = false; + tasks.clear(); + wakeUp(); + + try { + selector.close(); + } catch (IOException ex) { + LOGGER.warn(ex.getMessage(), ex); + } + + interrupt(); + + try { + join(); + } catch (Exception e) { + } + + } + } + + @Override + public void writeMetrics(MetricsContainer container) { + if (collectMetrics) { + container.update(System.currentTimeMillis(), metric); + metric.reset(); + } + } + + public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException { + tasks.add(new ConnectTask(sessionInfo, handler)); + selector.wakeup(); + } + + void flush(SelectionKey key) { + tasks.add(new FlushTask(key)); + selector.wakeup(); + } + + private void doRegister(SessionInfo sessionInfo, EmitterHandler handler) { + try { + ConnectionParams connParams = sessionMapper.map(sessionInfo); + if (connParams == null) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Unable to map session '" + sessionInfo + "'."); + } + + try { + handler.sessionInvalidated(sessionInfo); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + + return; + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Registering session '" + sessionInfo + "'. Mapped connection parameters '" + connParams + "'."); + } + + SocketChannel channel = SocketChannel.open(); + channel.setOption(StandardSocketOptions.SO_REUSEADDR, reusePort); + channel.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay); + channel.configureBlocking(false); + + SocketAddress bindAddress = connParams.getBindAddress(); + if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) { + channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress)); + } + + SocketAddress remoteAddress = connParams.getRemoteAddress(); + if (remoteAddress == null) { + remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); + } + + NioChannelContextNoExecutors channelContext = new NioChannelContextNoExecutors(this, channel, remoteAddress, sessionInfo); + KeyContext keyContext = new KeyContext(channelContext, handler); + SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext); + + try { + handler.channelRegistered(channelContext); + } catch (Exception ex) { + doCatchException(key, ex); + } + + channelContext.selectionKey(key); + try { + channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress)); + } catch (Exception ex) { + doCatchException(key, ex); + doClose(key); + return; + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Session registered '{}'.", sessionInfo); + } + + wakeUp(); + } catch (Exception e) { + if (collectMetrics) { + metric.errorCaught(e); + } + LOGGER.error(e.getMessage(), e); + } + + } + + private void doFinishConnect(SelectionKey key) { + SocketChannel channel = (SocketChannel) key.channel(); + KeyContext keyContext = (KeyContext) key.attachment(); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Connecting to '{}'.", keyContext.channelContext.getRemoteAddress()); + } + + try { + long connStart = System.currentTimeMillis(); + boolean timeouted = false; + while (!channel.finishConnect()) { + long now = System.currentTimeMillis(); + if (now - connStart < connectionTimeout) { + timeouted = true; + break; + } + } + + clearOpConnect(key); + if (timeouted) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Connection '{}' timed out.", keyContext.channelContext.getRemoteAddress()); + } + + throw new ConnectException("Connection timed out."); + + } + } catch (Exception e) { + doCatchException(key, e); + doClose(key); + if (collectMetrics) { + metric.incConnectionsErrors(); + } + + return; + } + + try { + if (collectMetrics) { + metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress()); + metric.addBindSocket(keyContext.channelContext.getLocalAddress()); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress()); + } + + keyContext.handler.channelActive(keyContext.channelContext); + setOpRead(key); + } catch (Exception ex) { + LOGGER.error(ex.getMessage(), ex); + } + } + + private void doWrite(SelectionKey key) { + SocketChannel socketChannel = (SocketChannel) key.channel(); + KeyContext keyContext = (KeyContext) key.attachment(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Writing to '{}'.", keyContext.channelContext.getRemoteAddress()); + } + + Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue(); + int written = 0; + try { + ByteBuffer buffer; + while (!queue.isEmpty()) { + buffer = queue.poll(); + while (buffer.hasRemaining()) { + int res = socketChannel.write(buffer); + + if (res == -1) { + doClose(key); + return; + } + + written += res; + } + } + } catch (Exception e) { + doCatchException(key, e); + doClose(key); + return; + } + + if (collectMetrics) { + metric.updateSentBytes(written); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Written '" + written + "' to '" + keyContext.channelContext.getRemoteAddress() + "'."); + } + + //TODO Operacje na handlerach powinny przechodzic przez Executor + try { + keyContext.handler.dataWritten(keyContext.channelContext); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + + setOpRead(key); + clearOpWrite(key); + } + + private int read(SocketChannel channel, ByteBuffer buffer, ByteBuff out) throws IOException { + int readed; + int totalReaded = 0; + while ((readed = channel.read(buffer)) > 0) { + buffer.flip(); + out.append(buffer.array(), buffer.position(), buffer.limit()); + buffer.clear(); + if (readed < 0) { + return -1; + } + + totalReaded += readed; + } + + return totalReaded; + } + + private void doRead(SelectionKey key) { + SocketChannel channel = (SocketChannel) key.channel(); + KeyContext keyContext = (KeyContext) key.attachment(); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Reading from '" + keyContext.channelContext.getRemoteAddress() + "'."); + } + + ByteBuffer buffer = keyContext.buffer; + + keyContext.buffer.clear(); + + ByteBuff buff = new HeapByteBuff(); + int totalReaded = 0; + int readed; + try { + while ((readed = channel.read(buffer)) > 0) { + buffer.flip(); + buff.append(buffer.array(), buffer.position(), buffer.limit()); + buffer.clear(); + totalReaded += readed; + } + } catch (IOException e) { + doCatchException(key, e); + doClose(key); + return; + } + + if (collectMetrics) { + metric.updateReceivedBytes(totalReaded); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(totalReaded + " readed from '" + keyContext.channelContext.getRemoteAddress() + "'."); + } + + try { + keyContext.handler.dataReceived(keyContext.channelContext, buff); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + + if (readed == -1) { + doClose(key); + return; + } + + keyContext.buffer.flip(); + } + + private void doCatchException(SelectionKey key, Throwable cause) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Error occured. " + cause.getMessage(), cause); + } + + if (collectMetrics) { + synchronized (metric) { + metric.errorCaught(cause); + } + } + + KeyContext keyContext = (KeyContext) key.attachment(); + + try { + keyContext.handler.errorOccured(keyContext.channelContext, cause); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + } + + private void doClose(SelectionKey key) { + KeyContext keyContext = (KeyContext) key.attachment(); + if (key.channel().isOpen()) { + try { + key.channel().close(); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'."); + } + try { + keyContext.handler.channelInactive(keyContext.channelContext); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + + if (key.isValid()) { + key.cancel(); + } + + try { + keyContext.handler.channelUnregistered(keyContext.channelContext); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'."); + } + + if (collectMetrics) { + metric.incClosedConnections(); + } + } + + private void sslDoHandshake(SocketChannel socketChannel, SSLEngine engine, + ByteBuffer myNetData, ByteBuffer peerNetData) throws Exception { + + // Create byte buffers to use for holding application data + int appBufferSize = engine.getSession().getApplicationBufferSize(); + ByteBuffer myAppData = ByteBuffer.allocate(appBufferSize); + ByteBuffer peerAppData = ByteBuffer.allocate(appBufferSize); + + // Begin handshake + engine.beginHandshake(); + SSLEngineResult.HandshakeStatus hs = engine.getHandshakeStatus(); + + // Process handshaking message + while (hs != SSLEngineResult.HandshakeStatus.FINISHED + && hs != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) { + switch (hs) { + case NEED_UNWRAP: + // Receive handshaking data from peer + if (socketChannel.read(peerNetData) < 0) { + // Handle closed channel + } + + // Process incoming handshaking data + peerNetData.flip(); + SSLEngineResult res = engine.unwrap(peerNetData, peerAppData); + peerNetData.compact(); + hs = res.getHandshakeStatus(); + + switch (res.getStatus()) { + case OK: + // Handle OK status + break; + case BUFFER_UNDERFLOW: + break; + case BUFFER_OVERFLOW: + break; + case CLOSED: + break; + // Handle other status: BUFFER_UNDERFLOW, BUFFER_OVERFLOW, CLOSED + } + break; + + case NEED_WRAP: + // Empty the local network packet buffer. + myNetData.clear(); + + // Generate handshaking data + res = engine.wrap(myAppData, myNetData); + hs = res.getHandshakeStatus(); + + // Check status + switch (res.getStatus()) { + case OK: + myNetData.flip(); + + // Send the handshaking data to peer + while (myNetData.hasRemaining()) { + if (socketChannel.write(myNetData) < 0) { + // Handle closed channel + } + } + break; + + // Handle other status: BUFFER_OVERFLOW, BUFFER_UNDERFLOW, CLOSED + } + break; + + case NEED_TASK: + // Handle blocking tasks + break; + + // Handle other status: // FINISHED or NOT_HANDSHAKING + } + } + // Processes after handshaking + } + + private void sslDoWrap(SocketChannel socketChannel) { + + } + + private void sslDoUnwrap(SocketChannel socketChannel, SSLEngine engine, ByteBuffer netData, ByteBuffer appData) { + SSLEngineResult res; + try { + res = engine.unwrap(netData, appData); + } catch (Exception e) { + return; + } + + switch (res.getStatus()) { + case OK: + break; + case CLOSED: + break; + case BUFFER_OVERFLOW: + // Maybe need to enlarge the peer application data buffer. + if (engine.getSession().getApplicationBufferSize() + > appData.capacity()) { + // enlarge the peer application data buffer + } else { + // compact or clear the buffer + } + // retry the operation + break; + + case BUFFER_UNDERFLOW: + // Maybe need to enlarge the peer network packet buffer + if (engine.getSession().getPacketBufferSize() + > netData.capacity()) { + // enlarge the peer network packet buffer + } else { + // compact or clear the buffer + } + // obtain more inbound network data and then retry the operation + break; + } + } + + private void wakeUp() { + if (wakeUp.compareAndSet(false, true)) { + selector.wakeup(); + } + } + + private void clearOpConnect(SelectionKey key) { + if (!key.isValid() || !key.isConnectable()) { + return; + } + + key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); + key.selector().wakeup(); + } + + private void setOpRead(SelectionKey key) { + if (!key.isValid() || key.isReadable()) { + return; + } + + key.interestOps(key.interestOps() | SelectionKey.OP_READ); + wakeUp(); + } + + private void clearOpRead(SelectionKey key) { + if (!key.isValid() || !key.isReadable()) { + return; + } + + key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); + } + + private void setOpWrite(SelectionKey key) { + if (!key.isValid() || key.isWritable()) { + return; + } + + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + wakeUp(); + } + + private void clearOpWrite(SelectionKey key) { + if (!key.isValid() || !key.isWritable()) { + return; + } + + key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); + } + + void requestClose(SelectionKey key) { + tasks.add(new CloseTask(key)); + wakeUp(); + } + + @Override + public void run() { + int selected = 0; + working = true; + while (working) { + if (!tasks.isEmpty()) { + Task task; + while ((task = tasks.poll()) != null) { + if (task.code == Task.CLOSE) { + doClose(((CloseTask) task).key); + } else if (task.code == Task.CONNECT) { + ConnectTask taskConn = (ConnectTask) task; + doRegister(taskConn.sessionInfo, taskConn.handler); + } else if (task.code == Task.FLUSH) { + FlushTask flushTask = (FlushTask) task; + setOpWrite(flushTask.key); + } + } + } + + try { + selected = selector.select(selectTimeout); + } catch (IOException ex) { + LOGGER.warn(ex.getMessage(), ex); + } + + if (selected > 0) { + Iterator<SelectionKey> it = selector.selectedKeys().iterator(); + while (it.hasNext()) { + SelectionKey key = it.next(); + it.remove(); + + if (!key.isValid()) { + continue; + } + + if (key.isConnectable()) { + doFinishConnect(key); + } else if (key.isWritable()) { + doWrite(key); + } else if (key.isReadable()) { + doRead(key); + } + } + } + } + } + + private static class KeyContext { + + private final EmitterHandler handler; + + private final NioChannelContextNoExecutors channelContext; + + private ByteBuffer buffer = ByteBuffer.allocate(1024); + + public KeyContext(NioChannelContextNoExecutors channelContext, EmitterHandler handler) { + this.channelContext = channelContext; + this.handler = handler; + } + + } + + private static abstract class Task { + + public static final int CLOSE = 1; + public static final int CONNECT = 2; + public static final int FLUSH = 3; + + private final int code; + + public Task(int code) { + this.code = code; + } + + } + + private final static class ConnectTask extends Task { + + private final SessionInfo sessionInfo; + private final EmitterHandler handler; + + public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) { + super(CONNECT); + this.sessionInfo = sessionInfo; + this.handler = handler; + } + + } + + private final static class CloseTask extends Task { + + private final SelectionKey key; + + public CloseTask(SelectionKey key) { + super(CLOSE); + this.key = key; + } + + } + + private final static class FlushTask extends Task { + + private final SelectionKey key; + + public FlushTask(SelectionKey key) { + super(FLUSH); + this.key = key; + } + + } +}