Mercurial > stress-tester
changeset 608:c4e1b90cb412 http-asynch-worker
init branch
author | Devel 2 |
---|---|
date | Mon, 09 Oct 2017 10:02:28 +0200 |
parents | 765556dd7c80 |
children | 811aa52e7ebe |
files | stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java stress-tester/src/test/java/com/passus/st/client/http/HttpAsynchClientWorkerTest.java |
diffstat | 4 files changed, 72 insertions(+), 20 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java Mon Oct 09 09:58:55 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java Mon Oct 09 10:02:28 2017 +0200 @@ -75,15 +75,12 @@ this.responseSynch = responseSynch; } - private void waitQuietly() { - try { - lock.wait(waitTimeout); - } catch (InterruptedException ignore) { - } - } - @Override protected void closeAllConnections() { + if (logger.isDebugEnabled()) { + logger.debug("Closing all connections."); + } + synchronized (lock) { boolean wait; do { @@ -206,8 +203,10 @@ case SessionStatusEvent.TYPE: { Event newEvent = eventInstanceForWorker(event); SessionStatusEvent statusEvent = (SessionStatusEvent) newEvent; - SessionEventsTask task = window.getSessionEventsTask(statusEvent.getSessionInfo(), true); - task.events.add(statusEvent); + if (statusEvent.getStatus() != SessionStatusEvent.STATUS_CLOSING) { + SessionEventsTask task = window.getSessionEventsTask(statusEvent.getSessionInfo(), true); + task.events.add(statusEvent); + } break; } case DataLoopEnd.TYPE: @@ -255,13 +254,32 @@ case SessionStatusEvent.TYPE: { SessionStatusEvent statusEvent = (SessionStatusEvent) event; if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { - connect(statusEvent); + HttpFlowContext flowContext = flowContext(statusEvent); + if (flowContext == null) { + connect(statusEvent); + } else { + switch (flowContext.state()) { + case HttpFlowContext.STATE_RESP_RECEIVED: + case HttpFlowContext.STATE_CONNECTED: + case HttpFlowContext.STATE_ERROR: + close(statusEvent); + return false; + case HttpFlowContext.STATE_DISCONNECTED: + connect(statusEvent); + return true; + default: + return false; + } + } } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { HttpFlowContext flowContext = flowContext(statusEvent); - if (flowContext != null) { - if (flowContext.state != HttpFlowContext.STATE_REQ_SENT) { - close(statusEvent); - } + if (flowContext == null) { + return true; + } else if (flowContext.state != HttpFlowContext.STATE_REQ_SENT) { + close(statusEvent); + return true; + } else { + return false; } } @@ -281,6 +299,8 @@ if (send(flowContext, (HttpSessionPayloadEvent) event)) { return true; } + + return false; case HttpFlowContext.STATE_DISCONNECTING: case HttpFlowContext.STATE_DISCONNECTED: if (connectPartialSession) { @@ -313,6 +333,7 @@ @Override public void run() { + synchronized (lock) { working = true; while (working) { @@ -335,10 +356,12 @@ return; } - if (!sessionTask.events.isEmpty()) { + while (!sessionTask.events.isEmpty()) { Event event = sessionTask.events.get(0); if (processSessionEvent((SessionEvent) event)) { sessionTask.events.remove(0); + } else { + break; } } @@ -361,7 +384,12 @@ if (currentWindow.tasks.isEmpty()) { break; } else if (!flowStateChanged) { - waitQuietly(); + flowStateChanged = false; + try { + lock.wait(waitTimeout); + } catch (InterruptedException ignore) { + } + } else { flowStateChanged = false; }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java Mon Oct 09 09:58:55 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java Mon Oct 09 10:02:28 2017 +0200 @@ -217,8 +217,8 @@ protected HttpFlowContext flowContext(SessionInfo session) { HttpFlowContext context = sessions.get(session); if (context == null) { - if (logger.isDebugEnabled()) { - logger.debug("Context for session '" + session + "' not found."); + if (logger.isTraceEnabled()) { + logger.trace("Context for session '{}' not found.", session); } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java Mon Oct 09 09:58:55 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java Mon Oct 09 10:02:28 2017 +0200 @@ -23,6 +23,9 @@ import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -268,7 +271,7 @@ keyContext.handler.channelActive(keyContext.channelContext); setOpRead(key); } catch (Exception ex) { - LOGGER.error(ex.getMessage(), ex); + } } @@ -291,6 +294,21 @@ int res = socketChannel.write(buffer); if (res == -1) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Session ({} -> {}) closed by serwer.", + keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Session ({} -> {}) closed by serwer.", + keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Session ({} -> {}) closed by serwer.", + keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); + } + doClose(key); return; } @@ -373,6 +391,13 @@ } if (readed == -1) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Session ({} -> {}) closed by serwer.", + keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); + } + } + + if (readed == -1) { doClose(key); return; }
--- a/stress-tester/src/test/java/com/passus/st/client/http/HttpAsynchClientWorkerTest.java Mon Oct 09 09:58:55 2017 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/http/HttpAsynchClientWorkerTest.java Mon Oct 09 10:02:28 2017 +0200 @@ -6,7 +6,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import com.passus.commons.service.ServiceUtils; import com.passus.st.AbstractWireMockTest; -import com.passus.st.Log4jConfigurationFactory; import com.passus.st.client.Event; import com.passus.st.client.SessionEvent; import com.passus.st.client.SessionStatusEvent;