changeset 535:aef8a6e0958e

HttpAsynchClientWorker - performance improvements
author Devel 2
date Mon, 11 Sep 2017 11:12:49 +0200
parents 91d608d00bca
children 2d655c494e41
files stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java
diffstat 1 files changed, 31 insertions(+), 26 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java	Fri Sep 08 13:51:10 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java	Mon Sep 11 11:12:49 2017 +0200
@@ -36,7 +36,9 @@
 
     private TasksTimeWindow currentWindow;
 
-    private boolean processWindow = false;
+    private volatile boolean processWindow = false;
+
+    private final Object readerLock = new Object();
 
     public HttpAsynchClientWorker(Emitter emitter, String name, int index) {
         super(emitter, name, index);
@@ -100,7 +102,9 @@
     }
 
     private void removeWindow(TasksTimeWindow window) {
-        windows.remove(window);
+        if (window != null) {
+            windows.remove(window);
+        }
     }
 
     private TasksTimeWindow createWindow(long time) {
@@ -187,26 +191,31 @@
 
     @Override
     public void handle(Event event) {
-        synchronized (lock) {
+        synchronized (readerLock) {
             while (processWindow) {
                 try {
-                    lock.wait(waitTimeout);
+                    readerLock.wait();
                 } catch (InterruptedException ignore) {
 
                 }
             }
         }
-        
-        TasksTimeWindow window = getWindow(event.getTimestamp(), true);
-        if (currentWindow == null) {
-            currentWindow = window;
-        } else if (window != currentWindow) {
-            currentWindow = windows.peek();
-            processWindow = true;
+
+        synchronized (lock) {
+            TasksTimeWindow window = getWindow(event.getTimestamp(), true);
+            if (currentWindow == null) {
+                currentWindow = window;
+            } else if (window != currentWindow) {
+                currentWindow = windows.peek();
+                processWindow = true;
+            }
+
+            addEvent(event, window);
+            if (processWindow) {
+                lock.notifyAll();
+            }
         }
 
-        addEvent(event, window);
-
     }
 
     private boolean processSessionEvent(SessionEvent event) {
@@ -231,13 +240,14 @@
                 HttpFlowContext flowContext = flowContext(sessEvent);
                 if (flowContext != null) {
                     switch (flowContext.state) {
+                        case HttpFlowContext.STATE_CONNECTING:
+                            return false;
                         case HttpFlowContext.STATE_CONNECTED:
                         case HttpFlowContext.STATE_RESP_RECEIVED:
                         case HttpFlowContext.STATE_ERROR:
                             if (send(flowContext, (HttpSessionPayloadEvent) event)) {
                                 return true;
                             }
-                            break;
                         case HttpFlowContext.STATE_DISCONNECTING:
                         case HttpFlowContext.STATE_DISCONNECTED:
                             if (connectPartialSession) {
@@ -274,19 +284,13 @@
             working = true;
             while (working) {
                 try {
-                    while (!processWindow) {
-                        waitQuietly();
-                    }
+                    lock.wait();
 
                     boolean dataLoopEnd = false;
                     boolean dataEnd = false;
 
                     for (;;) {
                         Iterator<Task> it = currentWindow.tasks.iterator();
-                        /*System.out.println("currentWindow: " + currentWindow.startTime + " " + currentWindow.endTime);
-                        windows.forEach((w) -> {
-                            System.out.println("\t: " + w.startTime + " " + w.endTime);
-                        });*/
                         while (it.hasNext()) {
                             Task task = it.next();
                             switch (task.type()) {
@@ -329,15 +333,16 @@
                     if (dataEnd) {
                         working = false;
                     }
-
-                    removeWindow(currentWindow);
-
-                    processWindow = false;
-                    lock.notifyAll();
                 } catch (Exception e) {
                     if (logger.isDebugEnabled()) {
                         logger.debug(e.getMessage(), e);
                     }
+                } finally {
+                    removeWindow(currentWindow);
+                    processWindow = false;
+                    synchronized (readerLock) {
+                        readerLock.notifyAll();
+                    }
                 }
             }
         }