changeset 608:c4e1b90cb412 http-asynch-worker

init branch
author Devel 2
date Mon, 09 Oct 2017 10:02:28 +0200
parents 765556dd7c80
children 811aa52e7ebe
files stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java stress-tester/src/test/java/com/passus/st/client/http/HttpAsynchClientWorkerTest.java
diffstat 4 files changed, 72 insertions(+), 20 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java	Mon Oct 09 09:58:55 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java	Mon Oct 09 10:02:28 2017 +0200
@@ -75,15 +75,12 @@
         this.responseSynch = responseSynch;
     }
 
-    private void waitQuietly() {
-        try {
-            lock.wait(waitTimeout);
-        } catch (InterruptedException ignore) {
-        }
-    }
-
     @Override
     protected void closeAllConnections() {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Closing all connections.");
+        }
+
         synchronized (lock) {
             boolean wait;
             do {
@@ -206,8 +203,10 @@
             case SessionStatusEvent.TYPE: {
                 Event newEvent = eventInstanceForWorker(event);
                 SessionStatusEvent statusEvent = (SessionStatusEvent) newEvent;
-                SessionEventsTask task = window.getSessionEventsTask(statusEvent.getSessionInfo(), true);
-                task.events.add(statusEvent);
+                if (statusEvent.getStatus() != SessionStatusEvent.STATUS_CLOSING) {
+                    SessionEventsTask task = window.getSessionEventsTask(statusEvent.getSessionInfo(), true);
+                    task.events.add(statusEvent);
+                }
                 break;
             }
             case DataLoopEnd.TYPE:
@@ -255,13 +254,32 @@
             case SessionStatusEvent.TYPE: {
                 SessionStatusEvent statusEvent = (SessionStatusEvent) event;
                 if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
-                    connect(statusEvent);
+                    HttpFlowContext flowContext = flowContext(statusEvent);
+                    if (flowContext == null) {
+                        connect(statusEvent);
+                    } else {
+                        switch (flowContext.state()) {
+                            case HttpFlowContext.STATE_RESP_RECEIVED:
+                            case HttpFlowContext.STATE_CONNECTED:
+                            case HttpFlowContext.STATE_ERROR:
+                                close(statusEvent);
+                                return false;
+                            case HttpFlowContext.STATE_DISCONNECTED:
+                                connect(statusEvent);
+                                return true;
+                            default:
+                                return false;
+                        }
+                    }
                 } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
                     HttpFlowContext flowContext = flowContext(statusEvent);
-                    if (flowContext != null) {
-                        if (flowContext.state != HttpFlowContext.STATE_REQ_SENT) {
-                            close(statusEvent);
-                        }
+                    if (flowContext == null) {
+                        return true;
+                    } else if (flowContext.state != HttpFlowContext.STATE_REQ_SENT) {
+                        close(statusEvent);
+                        return true;
+                    } else {
+                        return false;
                     }
                 }
 
@@ -281,6 +299,8 @@
                             if (send(flowContext, (HttpSessionPayloadEvent) event)) {
                                 return true;
                             }
+
+                            return false;
                         case HttpFlowContext.STATE_DISCONNECTING:
                         case HttpFlowContext.STATE_DISCONNECTED:
                             if (connectPartialSession) {
@@ -313,6 +333,7 @@
 
     @Override
     public void run() {
+
         synchronized (lock) {
             working = true;
             while (working) {
@@ -335,10 +356,12 @@
                                         return;
                                     }
 
-                                    if (!sessionTask.events.isEmpty()) {
+                                    while (!sessionTask.events.isEmpty()) {
                                         Event event = sessionTask.events.get(0);
                                         if (processSessionEvent((SessionEvent) event)) {
                                             sessionTask.events.remove(0);
+                                        } else {
+                                            break;
                                         }
                                     }
 
@@ -361,7 +384,12 @@
                         if (currentWindow.tasks.isEmpty()) {
                             break;
                         } else if (!flowStateChanged) {
-                            waitQuietly();
+                            flowStateChanged = false;
+                            try {
+                                lock.wait(waitTimeout);
+                            } catch (InterruptedException ignore) {
+                            }
+
                         } else {
                             flowStateChanged = false;
                         }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java	Mon Oct 09 09:58:55 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java	Mon Oct 09 10:02:28 2017 +0200
@@ -217,8 +217,8 @@
     protected HttpFlowContext flowContext(SessionInfo session) {
         HttpFlowContext context = sessions.get(session);
         if (context == null) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Context for session '" + session + "' not found.");
+            if (logger.isTraceEnabled()) {
+                logger.trace("Context for session '{}' not found.", session);
             }
         }
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Mon Oct 09 09:58:55 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Mon Oct 09 10:02:28 2017 +0200
@@ -23,6 +23,9 @@
 import java.util.Iterator;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -268,7 +271,7 @@
             keyContext.handler.channelActive(keyContext.channelContext);
             setOpRead(key);
         } catch (Exception ex) {
-            LOGGER.error(ex.getMessage(), ex);
+
         }
     }
 
@@ -291,6 +294,21 @@
                     int res = socketChannel.write(buffer);
 
                     if (res == -1) {
+                        if (LOGGER.isDebugEnabled()) {
+                            LOGGER.debug("Session ({} -> {}) closed by serwer.",
+                                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
+                        }
+
+                        if (LOGGER.isDebugEnabled()) {
+                            LOGGER.debug("Session ({} -> {}) closed by serwer.",
+                                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
+                        }
+
+                        if (LOGGER.isDebugEnabled()) {
+                            LOGGER.debug("Session ({} -> {}) closed by serwer.",
+                                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
+                        }
+
                         doClose(key);
                         return;
                     }
@@ -373,6 +391,13 @@
         }
 
         if (readed == -1) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Session ({} -> {}) closed by serwer.",
+                        keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
+            }
+        }
+
+        if (readed == -1) {
             doClose(key);
             return;
         }
--- a/stress-tester/src/test/java/com/passus/st/client/http/HttpAsynchClientWorkerTest.java	Mon Oct 09 09:58:55 2017 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/http/HttpAsynchClientWorkerTest.java	Mon Oct 09 10:02:28 2017 +0200
@@ -6,7 +6,6 @@
 import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
 import com.passus.commons.service.ServiceUtils;
 import com.passus.st.AbstractWireMockTest;
-import com.passus.st.Log4jConfigurationFactory;
 import com.passus.st.client.Event;
 import com.passus.st.client.SessionEvent;
 import com.passus.st.client.SessionStatusEvent;