Mercurial > stress-tester
changeset 528:f6b363ed693b
HttpFlowBasedClientWorker improvements
author | Devel 2 |
---|---|
date | Thu, 31 Aug 2017 10:09:49 +0200 |
parents | 05d63b5bf1d1 |
children | a19cc6e1ee65 |
files | stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java |
diffstat | 1 files changed, 101 insertions(+), 91 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java Thu Aug 31 09:55:22 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java Thu Aug 31 10:09:49 2017 +0200 @@ -1,6 +1,8 @@ package com.passus.st.client.http; import com.passus.commons.Assert; +import com.passus.commons.time.TimeAware; +import com.passus.commons.time.TimeGenerator; import com.passus.data.ByteBuff; import com.passus.data.DataDecoder; import com.passus.data.HeapByteBuff; @@ -32,7 +34,7 @@ * * @author Mirosław Hawrot */ -public abstract class HttpFlowBasedClientWorker extends HttpClientWorker { +public abstract class HttpFlowBasedClientWorker extends HttpClientWorker implements TimeAware { public static final Map<Integer, Long> DEFAULT_TIMEOUTS; @@ -68,6 +70,8 @@ protected final HttpScopes scopes; + protected TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); + public HttpFlowBasedClientWorker(Emitter emitter, String name, int index) { super(emitter, name, index); timeouts.putAll(DEFAULT_TIMEOUTS); @@ -93,6 +97,17 @@ return count; } + @Override + public TimeGenerator getTimeGenerator() { + return timeGenerator; + } + + @Override + public void setTimeGenerator(TimeGenerator timeGenerator) { + Assert.notNull(timeGenerator, "timeGenerator"); + this.timeGenerator = timeGenerator; + } + public float getSleepFactor() { return sleepFactor; } @@ -102,64 +117,78 @@ this.sleepFactor = sleepFactor; } - protected final void changeFlowState(HttpFlowContext flowContext, int state) { - if (flowContext.state == state) { - return; - } - - int oldState = flowContext.state; - if (logger.isDebugEnabled()) { - debug(flowContext, "Flow status changing {} -> {}.", - flowContext.stateString(), HttpFlowContext.contextStateToString(state) - ); - } + public long getCheckTimeoutsPeriod() { + return checkTimeoutsPeriod; + } - switch (state) { - case HttpFlowContext.STATE_CONNECTING: - flowContext.clear(); - break; - case HttpFlowContext.STATE_CONNECTED: - flowContext.decoder = new HttpFullMessageDecoder(); - flowContext.decoder.setDecodeRequest(false); - flowContext.buffer = new HeapByteBuff(HttpFlowContext.INIT_BUFFER_CAPACITY); - break; - case HttpFlowContext.STATE_ERROR: - flowContext.sentEvent = null; - break; - case HttpFlowContext.STATE_RESP_RECEIVED: - flowContext.sentEvent = null; - flowContext.receivedStartTimestamp = -1; - break; - case HttpFlowContext.STATE_DISCONNECTING: - if (flowContext.state < HttpFlowContext.STATE_DISCONNECTING) { - if (flowContext.channelContext != null) { - try { - flowContext.channelContext.close(); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); + public void setCheckTimeoutsPeriod(long checkTimeoutsPeriod) { + Assert.greaterThanZero(checkTimeoutsPeriod, "checkTimeoutsPeriod"); + this.checkTimeoutsPeriod = checkTimeoutsPeriod; + } + + protected final void changeFlowState(HttpFlowContext flowContext, int state) { + try { + if (flowContext.state == state) { + return; + } + + int oldState = flowContext.state; + if (logger.isDebugEnabled()) { + debug(flowContext, "Flow status changing {} -> {}.", + flowContext.stateString(), HttpFlowContext.contextStateToString(state) + ); + } + + switch (state) { + case HttpFlowContext.STATE_CONNECTING: + flowContext.clear(); + break; + case HttpFlowContext.STATE_CONNECTED: + flowContext.decoder = new HttpFullMessageDecoder(); + flowContext.decoder.setDecodeRequest(false); + flowContext.buffer = new HeapByteBuff(HttpFlowContext.INIT_BUFFER_CAPACITY); + break; + case HttpFlowContext.STATE_ERROR: + changeFlowState(flowContext, HttpFlowContext.STATE_DISCONNECTED); + break; + case HttpFlowContext.STATE_RESP_RECEIVED: + flowContext.sentEvent = null; + flowContext.receivedStartTimestamp = -1; + break; + case HttpFlowContext.STATE_DISCONNECTING: + if (flowContext.state < HttpFlowContext.STATE_DISCONNECTING) { + if (flowContext.channelContext != null) { + try { + flowContext.channelContext.close(); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } } + } else { + changeFlowState(flowContext, HttpFlowContext.STATE_DISCONNECTED); } } else { - changeFlowState(flowContext, HttpFlowContext.STATE_DISCONNECTED); + return; } - } else { + break; + case HttpFlowContext.STATE_DISCONNECTED: + flowContext.sentEvent = null; + flowContext.state = HttpFlowContext.STATE_DISCONNECTED; + flowContext.timeout = -1; + flowContext.clear(); + removeFlowContext(flowContext); + flowStateChanged(flowContext, oldState); return; - } - break; - case HttpFlowContext.STATE_DISCONNECTED: - flowContext.state = HttpFlowContext.STATE_DISCONNECTED; - flowContext.timeout = -1; - flowContext.clear(); - removeFlowContext(flowContext); - flowStateChanged(flowContext, oldState); - return; + } + + long timeout = timeouts.get(flowContext.state); + flowContext.timeout = timeGenerator.currentTimeMillis() + timeout; + flowContext.state = state; + flowStateChanged(flowContext, oldState); + } catch (Exception e) { + logger.debug(e.getMessage(), e); } - - long timeout = timeouts.get(flowContext.state); - flowContext.timeout = System.currentTimeMillis() + timeout; - flowContext.state = state; - flowStateChanged(flowContext, oldState); } protected void flowStateChanged(HttpFlowContext context, int oldState) { @@ -349,21 +378,15 @@ public void channelActive(ChannelContext context) throws Exception { synchronized (lock) { HttpFlowContext flowContext = flowContext(context); - try { - if (flowContext != null) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})", - context.getLocalAddress(), - context.getRemoteAddress()); - } + if (flowContext != null) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})", + context.getLocalAddress(), + context.getRemoteAddress()); + } - flowContext.channelContext = context; - changeFlowState(flowContext, STATE_CONNECTED); - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - debug(flowContext, e.getMessage(), e); - } + flowContext.channelContext = context; + changeFlowState(flowContext, STATE_CONNECTED); } lock.notifyAll(); @@ -374,20 +397,13 @@ public void channelInactive(ChannelContext context) throws Exception { synchronized (lock) { HttpFlowContext flowContext = flowContext(context); - try { - if (flowContext != null) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Channel inactive."); - } + if (flowContext != null) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Channel inactive."); + } - changeFlowState(flowContext, STATE_DISCONNECTED); - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - debug(flowContext, e.getMessage(), e); - } + changeFlowState(flowContext, STATE_DISCONNECTED); } - lock.notifyAll(); } } @@ -405,11 +421,11 @@ } if (req != null) { - decoder.setHeadResponse(req.getMethod().equals(HttpMethod.HEAD)); + decoder.setRequestMethod(HttpMethod.HEAD); } decoder.decode(data); - long now = System.currentTimeMillis(); + long now = timeGenerator.currentTimeMillis(); if (flowContext.receivedStartTimestamp == -1) { flowContext.receivedStartTimestamp = now; } @@ -484,7 +500,7 @@ synchronized (lock) { HttpFlowContext flowContext = flowContext(context); if (flowContext != null && flowContext.sentEvent != null) { - long now = System.currentTimeMillis(); + long now = timeGenerator.currentTimeMillis(); if (collectMetric) { synchronized (metric) { metric.addRequestSendingTime(now - flowContext.sendStartTimestamp); @@ -507,14 +523,8 @@ synchronized (lock) { HttpFlowContext flowContext = flowContext(context); - try { - if (flowContext != null) { - changeFlowState(flowContext, HttpFlowContext.STATE_ERROR); - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - debug(flowContext, e.getMessage(), e); - } + if (flowContext != null) { + changeFlowState(flowContext, HttpFlowContext.STATE_ERROR); } lock.notifyAll(); @@ -545,7 +555,7 @@ } try { - long now = System.currentTimeMillis(); + long now = timeGenerator.currentTimeMillis(); req.setTag(TAG_TIME_START, now); context.sendStartTimestamp = now; changeFlowState(context, HttpFlowContext.STATE_REQ_SENT); @@ -573,7 +583,7 @@ protected void processTimeouts() { synchronized (lock) { try { - long now = System.currentTimeMillis(); + long now = timeGenerator.currentTimeMillis(); if (nextCheckTimeoutsTime == -1) { nextCheckTimeoutsTime = now + checkTimeoutsPeriod; } else if (nextCheckTimeoutsTime > now) {