Mercurial > stress-tester
changeset 1032:d136672f267c
FlowWorkerBase refactorization in progress
author | Devel 2 |
---|---|
date | Thu, 02 Apr 2020 15:34:59 +0200 |
parents | 37d098b33b23 |
children | 386815ce52ee |
files | stress-tester/src/main/java/com/passus/st/client/FlowError.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java |
diffstat | 4 files changed, 47 insertions(+), 30 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowError.java Thu Apr 02 14:02:38 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowError.java Thu Apr 02 15:34:59 2020 +0200 @@ -27,11 +27,12 @@ public static final byte CODE_IO_ERROR = 13; public static final byte CODE_MAX_ENCODER_ERRORS_REACHED = 14; public static final byte CODE_MAX_DECODER_ERRORS_REACHED = 15; - public static final byte CODE_IDE_TIMEOUT = 16; + public static final byte CODE_MAX_SEND_ERRORS_REACHED = 16; + public static final byte CODE_IDE_TIMEOUT = 17; - public static final byte CODE_SSL_ERROR = 17; - public static final byte CODE_SSL_UNRECOGNIZED_MESSAGE_ERROR = 18; - public static final byte CODE_SSL_HANDSHAKE_ERROR = 19; + public static final byte CODE_SSL_ERROR = 18; + public static final byte CODE_SSL_UNRECOGNIZED_MESSAGE_ERROR = 19; + public static final byte CODE_SSL_HANDSHAKE_ERROR = 20; public static final byte CODE_INTERNAL_ERROR = (byte) 255;
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Thu Apr 02 14:02:38 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Thu Apr 02 15:34:59 2020 +0200 @@ -41,6 +41,8 @@ private int maxEncoderErrors = 3; + private int maxSendErrors = 3; + private long reconnectDelay = 1000; private float sleepFactor = SLEEP_FACTOR_NO_SLEEP; @@ -112,6 +114,14 @@ this.maxEncoderErrors = maxEncoderErrors; } + public int getMaxSendErrors() { + return maxSendErrors; + } + + public void setMaxSendErrors(int maxSendErrors) { + this.maxSendErrors = maxSendErrors; + } + protected void flowStateChanged(FlowContext context, int oldState) { } @@ -257,7 +267,6 @@ debug(flowContext, "Disconnect."); } - SessionInfo sessionInfo = null; long now = timeGenerator.currentTimeMillis(); flowContext.lock.lock(); try { @@ -266,10 +275,6 @@ return; } - /*if (removeFlow) { - sessionInfo = flowContext.session; - }*/ - flowContext.state = STATE_DISCONNECTING; flowContext.timeout = now + timeouts.getDisconnectingTimeout(); @@ -299,12 +304,6 @@ } finally { flowContext.lock.unlock(); } - - if (removeFlow && sessionInfo != null) { - synchronized (sessions) { - sessions.remove(sessionInfo); - } - } } protected void errorInternal(FlowContext flowContext, Throwable cause) { @@ -486,8 +485,6 @@ flowContext.signal(); flowContext.unlock(); } - - } @Override @@ -510,9 +507,11 @@ @Override public void dataReceived(ChannelContext context, ByteBuff data) throws Exception { FlowContext flowContext = (FlowContext) context.getAttachment(); + logger.debug("dataReceived"); flowContext.lock(); try { try { + logger.debug("dataReceived-after lock"); FlowHandler client = flowContext.client(); FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext); decoder.decode(data, flowContext); @@ -651,8 +650,10 @@ } protected void send(FlowContext flowContext, SessionPayloadEvent event, boolean wait) { + logger.debug("send"); flowContext.lock(); try { + logger.debug("send-after lock"); Object req = event.getRequest(); if (req != null) { if (filterChain.filterOutbound(req, event.getResponse(), flowContext) == Filter.DENY) { @@ -686,18 +687,23 @@ } try { + flowContext.sentEvent = event; flowContext.channelContext().writeAndFlush(buffer); requestSent0(flowContext, event); buffer.clear(); + + if (wait && flowContext.isBidirectional()) { + waitForResponse(flowContext); + } } catch (Exception e) { flowContext.sendErrors++; if (logger.isDebugEnabled()) { debug(flowContext, e.getMessage(), e); } - } - if (wait && flowContext.isBidirectional()) { - waitForResponse(flowContext); + if (flowContext.sendErrors == maxSendErrors) { + error(flowContext, new FlowError(CODE_MAX_SEND_ERRORS_REACHED, "Max send errors reached.")); + } } } } catch (Exception e) { @@ -712,8 +718,9 @@ } protected boolean waitForResponse(FlowContext flowContext, long timeout) throws InterruptedException { + logger.debug("waitForResponse"); long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeout); - while (flowContext.sentEvent != null && flowContext.state == STATE_CONNECTED) { + while (flowContext.sentEvent != null && !flowContext.isError()) { if (timeNanos <= 0) { return false; }
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Thu Apr 02 14:02:38 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Thu Apr 02 15:34:59 2020 +0200 @@ -9,6 +9,7 @@ import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpResponse; import com.passus.net.http.HttpResponseEncoder; +import com.passus.st.Log4jConfigurationFactory; import com.passus.st.emitter.*; import com.passus.st.metric.MetricsContainer; import com.passus.st.utils.EventUtils; @@ -18,6 +19,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static com.passus.st.utils.Assert.assertHttpClientEvents; import static org.testng.AssertJUnit.assertEquals; @@ -25,7 +29,7 @@ public class SynchFlowWorkerTest { - public static final long JOIN_TIMEOUT = 1_000; + public static final long JOIN_TIMEOUT = Long.MAX_VALUE; private final TestHttpClientListener listener = new TestHttpClientListener(); @@ -37,6 +41,8 @@ private final DataEncoder encoder; + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + public LocalEmitter() { this(HttpResponseEncoder.INSTANCE); } @@ -76,11 +82,14 @@ HttpResponse response = (HttpResponse) event.getResponse(); ByteBuff buff = new HeapByteBuff(); encoder.encode(response, buff); - try { - clientWorker.dataReceived(channelContext, buff); - } catch (Exception ex) { - ex.printStackTrace(); - } + + executor.execute(() -> { + try { + clientWorker.dataReceived(channelContext, buff); + } catch (Exception ex) { + ex.printStackTrace(); + } + }); } protected void close(LocalChannelContext channelContext) { @@ -251,7 +260,7 @@ } - assertFalse(worker.isWorking()); + assertFalse("Worker is still working.", worker.isWorking()); } private List<Event> readDefaultEvents() throws Exception { @@ -290,7 +299,7 @@ } @Test - public void testHandle_EmitterException() throws Exception { + public void testHandle_EmitterException_SendErrorsNotReached() throws Exception { List<Event> events = readDefaultEvents(); LocalEmitter emitter = new LocalEmitter((object, out) -> { throw new RuntimeException("Test exception");
--- a/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java Thu Apr 02 14:02:38 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java Thu Apr 02 15:34:59 2020 +0200 @@ -54,7 +54,7 @@ try { flowExecutor.start(); events.forEach(flowExecutor::handle); - flowExecutor.join(2_000); + flowExecutor.join(5_000); assertEquals(0, listener.getErrors()); assertEquals(2, listener.getReceived().size());