changeset 528:f6b363ed693b

HttpFlowBasedClientWorker improvements
author Devel 2
date Thu, 31 Aug 2017 10:09:49 +0200
parents 05d63b5bf1d1
children a19cc6e1ee65
files stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java
diffstat 1 files changed, 101 insertions(+), 91 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java	Thu Aug 31 09:55:22 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java	Thu Aug 31 10:09:49 2017 +0200
@@ -1,6 +1,8 @@
 package com.passus.st.client.http;
 
 import com.passus.commons.Assert;
+import com.passus.commons.time.TimeAware;
+import com.passus.commons.time.TimeGenerator;
 import com.passus.data.ByteBuff;
 import com.passus.data.DataDecoder;
 import com.passus.data.HeapByteBuff;
@@ -32,7 +34,7 @@
  *
  * @author Mirosław Hawrot
  */
-public abstract class HttpFlowBasedClientWorker extends HttpClientWorker {
+public abstract class HttpFlowBasedClientWorker extends HttpClientWorker implements TimeAware {
 
     public static final Map<Integer, Long> DEFAULT_TIMEOUTS;
 
@@ -68,6 +70,8 @@
 
     protected final HttpScopes scopes;
 
+    protected TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
+
     public HttpFlowBasedClientWorker(Emitter emitter, String name, int index) {
         super(emitter, name, index);
         timeouts.putAll(DEFAULT_TIMEOUTS);
@@ -93,6 +97,17 @@
         return count;
     }
 
+    @Override
+    public TimeGenerator getTimeGenerator() {
+        return timeGenerator;
+    }
+
+    @Override
+    public void setTimeGenerator(TimeGenerator timeGenerator) {
+        Assert.notNull(timeGenerator, "timeGenerator");
+        this.timeGenerator = timeGenerator;
+    }
+
     public float getSleepFactor() {
         return sleepFactor;
     }
@@ -102,64 +117,78 @@
         this.sleepFactor = sleepFactor;
     }
 
-    protected final void changeFlowState(HttpFlowContext flowContext, int state) {
-        if (flowContext.state == state) {
-            return;
-        }
-
-        int oldState = flowContext.state;
-        if (logger.isDebugEnabled()) {
-            debug(flowContext, "Flow status changing {} -> {}.",
-                    flowContext.stateString(), HttpFlowContext.contextStateToString(state)
-            );
-        }
+    public long getCheckTimeoutsPeriod() {
+        return checkTimeoutsPeriod;
+    }
 
-        switch (state) {
-            case HttpFlowContext.STATE_CONNECTING:
-                flowContext.clear();
-                break;
-            case HttpFlowContext.STATE_CONNECTED:
-                flowContext.decoder = new HttpFullMessageDecoder();
-                flowContext.decoder.setDecodeRequest(false);
-                flowContext.buffer = new HeapByteBuff(HttpFlowContext.INIT_BUFFER_CAPACITY);
-                break;
-            case HttpFlowContext.STATE_ERROR:
-                flowContext.sentEvent = null;
-                break;
-            case HttpFlowContext.STATE_RESP_RECEIVED:
-                flowContext.sentEvent = null;
-                flowContext.receivedStartTimestamp = -1;
-                break;
-            case HttpFlowContext.STATE_DISCONNECTING:
-                if (flowContext.state < HttpFlowContext.STATE_DISCONNECTING) {
-                    if (flowContext.channelContext != null) {
-                        try {
-                            flowContext.channelContext.close();
-                        } catch (Exception e) {
-                            if (logger.isDebugEnabled()) {
-                                logger.debug(e.getMessage(), e);
+    public void setCheckTimeoutsPeriod(long checkTimeoutsPeriod) {
+        Assert.greaterThanZero(checkTimeoutsPeriod, "checkTimeoutsPeriod");
+        this.checkTimeoutsPeriod = checkTimeoutsPeriod;
+    }
+
+    protected final void changeFlowState(HttpFlowContext flowContext, int state) {
+        try {
+            if (flowContext.state == state) {
+                return;
+            }
+
+            int oldState = flowContext.state;
+            if (logger.isDebugEnabled()) {
+                debug(flowContext, "Flow status changing {} -> {}.",
+                        flowContext.stateString(), HttpFlowContext.contextStateToString(state)
+                );
+            }
+
+            switch (state) {
+                case HttpFlowContext.STATE_CONNECTING:
+                    flowContext.clear();
+                    break;
+                case HttpFlowContext.STATE_CONNECTED:
+                    flowContext.decoder = new HttpFullMessageDecoder();
+                    flowContext.decoder.setDecodeRequest(false);
+                    flowContext.buffer = new HeapByteBuff(HttpFlowContext.INIT_BUFFER_CAPACITY);
+                    break;
+                case HttpFlowContext.STATE_ERROR:
+                    changeFlowState(flowContext, HttpFlowContext.STATE_DISCONNECTED);
+                    break;
+                case HttpFlowContext.STATE_RESP_RECEIVED:
+                    flowContext.sentEvent = null;
+                    flowContext.receivedStartTimestamp = -1;
+                    break;
+                case HttpFlowContext.STATE_DISCONNECTING:
+                    if (flowContext.state < HttpFlowContext.STATE_DISCONNECTING) {
+                        if (flowContext.channelContext != null) {
+                            try {
+                                flowContext.channelContext.close();
+                            } catch (Exception e) {
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug(e.getMessage(), e);
+                                }
                             }
+                        } else {
+                            changeFlowState(flowContext, HttpFlowContext.STATE_DISCONNECTED);
                         }
                     } else {
-                        changeFlowState(flowContext, HttpFlowContext.STATE_DISCONNECTED);
+                        return;
                     }
-                } else {
+                    break;
+                case HttpFlowContext.STATE_DISCONNECTED:
+                    flowContext.sentEvent = null;
+                    flowContext.state = HttpFlowContext.STATE_DISCONNECTED;
+                    flowContext.timeout = -1;
+                    flowContext.clear();
+                    removeFlowContext(flowContext);
+                    flowStateChanged(flowContext, oldState);
                     return;
-                }
-                break;
-            case HttpFlowContext.STATE_DISCONNECTED:
-                flowContext.state = HttpFlowContext.STATE_DISCONNECTED;
-                flowContext.timeout = -1;
-                flowContext.clear();
-                removeFlowContext(flowContext);
-                flowStateChanged(flowContext, oldState);
-                return;
+            }
+
+            long timeout = timeouts.get(flowContext.state);
+            flowContext.timeout = timeGenerator.currentTimeMillis() + timeout;
+            flowContext.state = state;
+            flowStateChanged(flowContext, oldState);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
         }
-
-        long timeout = timeouts.get(flowContext.state);
-        flowContext.timeout = System.currentTimeMillis() + timeout;
-        flowContext.state = state;
-        flowStateChanged(flowContext, oldState);
     }
 
     protected void flowStateChanged(HttpFlowContext context, int oldState) {
@@ -349,21 +378,15 @@
     public void channelActive(ChannelContext context) throws Exception {
         synchronized (lock) {
             HttpFlowContext flowContext = flowContext(context);
-            try {
-                if (flowContext != null) {
-                    if (logger.isDebugEnabled()) {
-                        debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})",
-                                context.getLocalAddress(),
-                                context.getRemoteAddress());
-                    }
+            if (flowContext != null) {
+                if (logger.isDebugEnabled()) {
+                    debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})",
+                            context.getLocalAddress(),
+                            context.getRemoteAddress());
+                }
 
-                    flowContext.channelContext = context;
-                    changeFlowState(flowContext, STATE_CONNECTED);
-                }
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, e.getMessage(), e);
-                }
+                flowContext.channelContext = context;
+                changeFlowState(flowContext, STATE_CONNECTED);
             }
 
             lock.notifyAll();
@@ -374,20 +397,13 @@
     public void channelInactive(ChannelContext context) throws Exception {
         synchronized (lock) {
             HttpFlowContext flowContext = flowContext(context);
-            try {
-                if (flowContext != null) {
-                    if (logger.isDebugEnabled()) {
-                        debug(flowContext, "Channel inactive.");
-                    }
+            if (flowContext != null) {
+                if (logger.isDebugEnabled()) {
+                    debug(flowContext, "Channel inactive.");
+                }
 
-                    changeFlowState(flowContext, STATE_DISCONNECTED);
-                }
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, e.getMessage(), e);
-                }
+                changeFlowState(flowContext, STATE_DISCONNECTED);
             }
-
             lock.notifyAll();
         }
     }
@@ -405,11 +421,11 @@
                     }
 
                     if (req != null) {
-                        decoder.setHeadResponse(req.getMethod().equals(HttpMethod.HEAD));
+                        decoder.setRequestMethod(HttpMethod.HEAD);
                     }
 
                     decoder.decode(data);
-                    long now = System.currentTimeMillis();
+                    long now = timeGenerator.currentTimeMillis();
                     if (flowContext.receivedStartTimestamp == -1) {
                         flowContext.receivedStartTimestamp = now;
                     }
@@ -484,7 +500,7 @@
         synchronized (lock) {
             HttpFlowContext flowContext = flowContext(context);
             if (flowContext != null && flowContext.sentEvent != null) {
-                long now = System.currentTimeMillis();
+                long now = timeGenerator.currentTimeMillis();
                 if (collectMetric) {
                     synchronized (metric) {
                         metric.addRequestSendingTime(now - flowContext.sendStartTimestamp);
@@ -507,14 +523,8 @@
 
         synchronized (lock) {
             HttpFlowContext flowContext = flowContext(context);
-            try {
-                if (flowContext != null) {
-                    changeFlowState(flowContext, HttpFlowContext.STATE_ERROR);
-                }
-            } catch (Exception e) {
-                if (logger.isDebugEnabled()) {
-                    debug(flowContext, e.getMessage(), e);
-                }
+            if (flowContext != null) {
+                changeFlowState(flowContext, HttpFlowContext.STATE_ERROR);
             }
 
             lock.notifyAll();
@@ -545,7 +555,7 @@
                 }
 
                 try {
-                    long now = System.currentTimeMillis();
+                    long now = timeGenerator.currentTimeMillis();
                     req.setTag(TAG_TIME_START, now);
                     context.sendStartTimestamp = now;
                     changeFlowState(context, HttpFlowContext.STATE_REQ_SENT);
@@ -573,7 +583,7 @@
     protected void processTimeouts() {
         synchronized (lock) {
             try {
-                long now = System.currentTimeMillis();
+                long now = timeGenerator.currentTimeMillis();
                 if (nextCheckTimeoutsTime == -1) {
                     nextCheckTimeoutsTime = now + checkTimeoutsPeriod;
                 } else if (nextCheckTimeoutsTime > now) {