Mercurial > stress-tester
changeset 1023:831da81c262f
FlowWorkerBase refactorization in progress
author | Devel 2 |
---|---|
date | Tue, 31 Mar 2020 11:55:55 +0200 |
parents | fa7ecf793f0c |
children | 14adc04b1b8d |
files | stress-tester/src/main/java/com/passus/st/client/FlowContext.java stress-tester/src/main/java/com/passus/st/client/FlowError.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java |
diffstat | 5 files changed, 109 insertions(+), 64 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Mon Mar 30 15:27:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Tue Mar 31 11:55:55 2020 +0200 @@ -14,8 +14,6 @@ public static final byte STATE_ERROR = -1; public static final byte STATE_CONNECTING = 0; public static final byte STATE_CONNECTED = 1; - /* public static final byte STATE_REQ_SENT = 2; - public static final byte STATE_RESP_RECEIVED = 3;*/ public static final byte STATE_DISCONNECTING = 4; public static final byte STATE_DISCONNECTED = 5; @@ -55,7 +53,7 @@ protected int sendErrors; - protected FlowError lastError; + protected FlowError error; @Deprecated protected DataDecoder decoder; @@ -106,6 +104,24 @@ return timeout; } + public boolean isBlocked() { + return blocked; + } + + public void error(FlowError error) { + this.error = error; + this.state = STATE_ERROR; + blocked = true; + } + + public FlowError error() { + return error; + } + + public boolean isError() { + return error != null; + } + public long receivedStartTimestamp() { return receivedStartTime; } @@ -171,14 +187,6 @@ return session; } - public boolean isBlocked() { - return blocked; - } - - public FlowError getLastError() { - return lastError; - } - public Object getParam(String name) { if (params == null) { return null; @@ -226,6 +234,8 @@ buffer = null; sentEvent = null; timeout = -1; + error = null; + blocked = false; if (params != null) { params.clear();
--- a/stress-tester/src/main/java/com/passus/st/client/FlowError.java Mon Mar 30 15:27:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowError.java Tue Mar 31 11:55:55 2020 +0200 @@ -9,54 +9,76 @@ public class FlowError { - public static final int CODE_UNKNOWN_ERROR = 0; - public static final int CODE_CONNECTION_ERROR = 1; - public static final int CODE_CONNECTION_REFUSED = 2; - public static final int CODE_CONNECTION_TIMEOUT = 3; - public static final int CODE_UNKNOWN_HOST = 4; - public static final int CODE_NO_ROUTE_TO_HOST = 5; - public static final int CODE_NETWORK_UNREACHABLE = 6; + public static final byte CODE_UNKNOWN_ERROR = 0; - public static final int CODE_READ_TIMEOUT = 7; - public static final int CODE_INVALID_RESPONSE = 8; - public static final int CODE_CONNECTION_CLOSED_UNEXPECTEDLY = 9; - public static final int CODE_CONNECTION_RESET_BY_PEER = 10; - public static final int CODE_SOCKET_ERROR = 11; - public static final int CODE_IO_ERROR = 12; + public static final byte CODE_CONNECTION_ERROR = 1; + public static final byte CODE_CONNECTION_REFUSED = 2; + public static final byte CODE_CONNECTION_TIMEOUT = 3; + public static final byte CODE_UNKNOWN_HOST = 4; + public static final byte CODE_NO_ROUTE_TO_HOST = 5; + public static final byte CODE_NETWORK_UNREACHABLE = 6; + public static final byte CODE_CONNECTION_ATTEMPTS_REACHED = 7; - public static final int CODE_SSL_ERROR = 13; - public static final int CODE_SSL_UNRECOGNIZED_MESSAGE_ERROR = 14; - public static final int CODE_SSL_HANDSHAKE_ERROR = 15; + public static final byte CODE_READ_TIMEOUT = 8; + public static final byte CODE_INVALID_RESPONSE = 9; + public static final byte CODE_CONNECTION_CLOSED_UNEXPECTEDLY = 10; + public static final byte CODE_CONNECTION_RESET_BY_PEER = 11; + public static final byte CODE_SOCKET_ERROR = 12; + public static final byte CODE_IO_ERROR = 13; + public static final byte CODE_MAX_ENCODER_ERRORS_REACHED = 14; + public static final byte CODE_MAX_DECODER_ERRORS_REACHED = 15; + public static final byte CODE_IDE_TIMEOUT = 16; - private final int code; + public static final byte CODE_SSL_ERROR = 17; + public static final byte CODE_SSL_UNRECOGNIZED_MESSAGE_ERROR = 18; + public static final byte CODE_SSL_HANDSHAKE_ERROR = 19; + + private final byte code; private final String message; - private final Exception cause; + private final Throwable cause; - public FlowError(int code, String message) { + public FlowError(byte code, String message) { this(code, message, null); } - public FlowError(int code, String message, Exception cause) { + public FlowError(byte code, String message, Throwable cause) { this.code = code; this.message = message; this.cause = cause; } - public int getCode() { + public byte code() { return code; } - public String getMessage() { + public String message() { return message; } - public Exception getCause() { + public Throwable cause() { return cause; } - public static FlowError interpret(Exception ex, String stackTrace) { + @Override + public String toString() { + return "FlowError{" + + "code=" + code + + ", message='" + message + '\'' + + ", cause=" + cause + + '}'; + } + + public static FlowError unknownError() { + return unknownError(null); + } + + public static FlowError unknownError(Throwable cause) { + return new FlowError(CODE_UNKNOWN_ERROR, "Unknown error.", cause); + } + + public static FlowError interpret(Throwable ex, String stackTrace) { if (stackTrace.contains("Connection refused")) { return new FlowError(CODE_CONNECTION_REFUSED, "Connection refused.", ex); } else if (stackTrace.contains("connect timed out")) { @@ -92,10 +114,10 @@ return new FlowError(CODE_IO_ERROR, "Networking IO error.", ex); } - return new FlowError(CODE_UNKNOWN_ERROR, "Unknown error.", ex); + return unknownError(ex); } - public static FlowError interpret(Exception ex) { + public static FlowError interpret(Throwable ex) { try { String stackTrace; try (StringWriter sw = new StringWriter(); @@ -109,7 +131,7 @@ } - return new FlowError(CODE_UNKNOWN_ERROR, "Unknown error.", ex); + return unknownError(ex); } }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Mon Mar 30 15:27:57 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Tue Mar 31 11:55:55 2020 +0200 @@ -12,13 +12,14 @@ import it.unimi.dsi.fastutil.bytes.Byte2LongArrayMap; import it.unimi.dsi.fastutil.bytes.Byte2LongMap; -import java.io.IOException; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import static com.passus.st.client.FlowContext.*; +import static com.passus.st.client.FlowError.CODE_CONNECTION_ATTEMPTS_REACHED; +import static com.passus.st.client.FlowError.CODE_MAX_ENCODER_ERRORS_REACHED; import static com.passus.st.client.FlowUtils.DEFAULT_TIMEOUTS; public abstract class FlowWorkerBase extends FlowWorker { @@ -255,30 +256,39 @@ } } - private void error(FlowContext flowContext, Throwable cause) { + protected void error(FlowContext flowContext, Throwable cause) { + error(flowContext, FlowError.interpret(cause)); + } + + protected void error(FlowContext flowContext, FlowError error) { if (flowContext.state >= STATE_CONNECTED && flowContext.state < STATE_DISCONNECTING) { disconnect(flowContext, false); } - flowContext.blocked = true; - updateFlowState(flowContext, STATE_ERROR); + flowContext.error(error); + flowError(flowContext); } - private void requestSent0(FlowContext flowContext) { - requestSent(flowContext); - } - - protected void requestSent(FlowContext flowContext) { + protected void flowError(FlowContext flowContext) { } - private void responseReceived0(FlowContext flowContext) { - responseReceived(flowContext); + private void requestSent0(FlowContext flowContext, SessionPayloadEvent event) { + flowContext.sentEvent = event; + onRequestSent(flowContext, event); + } + + protected void onRequestSent(FlowContext flowContext, SessionPayloadEvent event) { + + } + + private void responseReceived0(FlowContext flowContext, Object response) { + onResponseReceived(flowContext, response); flowContext.sentEvent(null); flowContext.receivedStartTimestamp(-1); } - protected void responseReceived(FlowContext flowContext) { + protected void onResponseReceived(FlowContext flowContext, Object response) { } @@ -442,7 +452,7 @@ } decoder.clear(flowContext); - responseReceived0(flowContext); + responseReceived0(flowContext, null); } else if (decoder.state() == DataDecoder.STATE_FINISHED) { if (collectMetric) { synchronized (metric) { @@ -466,7 +476,7 @@ } decoder.clear(flowContext); - responseReceived0(flowContext); + responseReceived0(flowContext, resp); } } catch (Exception e) { if (collectMetric) { @@ -478,6 +488,8 @@ if (logger.isDebugEnabled()) { debug(flowContext, e.getMessage(), e); } + + error(flowContext, FlowError.unknownError()); } lock.notifyAll(); @@ -510,7 +522,7 @@ flowContext.client().onDataWriteEnd(flowContext); if (!flowContext.isBidirectional()) { - responseReceived0(flowContext); + responseReceived0(flowContext, null); } } @@ -548,7 +560,7 @@ } } - error(flowContext, cause); + error(flowContext, new FlowError(CODE_CONNECTION_ATTEMPTS_REACHED, "Max connection attempts reached.")); lock.notifyAll(); } } @@ -574,7 +586,7 @@ } if (flowContext.encoderErrors == maxEncoderErrors) { - error(flowContext, new IOException()); + error(flowContext, new FlowError(CODE_MAX_ENCODER_ERRORS_REACHED, "Max encoder errors reached.")); } return false; @@ -588,9 +600,8 @@ } try { - flowContext.sentEvent(event); flowContext.channelContext().writeAndFlush(buffer); - requestSent0(flowContext); + requestSent0(flowContext, event); buffer.clear(); return true; } catch (Exception e) {
--- a/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java Mon Mar 30 15:27:57 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java Tue Mar 31 11:55:55 2020 +0200 @@ -10,7 +10,6 @@ import org.mockito.stubbing.Answer; import org.testng.annotations.Test; -import java.io.IOException; import java.net.ConnectException; import static com.passus.net.session.Session.PROTOCOL_TCP; @@ -18,8 +17,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.*; -import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertNull; +import static org.testng.AssertJUnit.*; public class FlowWorkerBaseTest { @@ -82,7 +80,6 @@ MutableInt count = new MutableInt(0); doAnswer((Answer<Void>) invocation -> applyConnect(invocation, (sessionInfo, handler, integer) -> { - if (count.intValue() == 1) { handler.channelActive(channelContext); } else { @@ -100,7 +97,8 @@ assertEquals(1, count.intValue()); assertEquals(FlowContext.STATE_CONNECTED, flowContext.state); assertEquals(0, flowContext.connectionAttempts); - assertEquals(false, flowContext.blocked); + assertFalse(flowContext.blocked); + assertFalse(flowContext.isError()); } @Test @@ -117,8 +115,12 @@ worker.connect(session); FlowContext flowContext = worker.flowContext(session); - assertEquals(FlowContext.STATE_ERROR, flowContext.state); assertEquals(true, flowContext.blocked); + assertTrue(flowContext.isError()); + FlowError error = flowContext.error(); + assertEquals(FlowError.CODE_CONNECTION_ATTEMPTS_REACHED, error.code()); + assertEquals("Max connection attempts reached.", error.message()); + assertEquals(null, error.cause()); } @Test
--- a/stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java Mon Mar 30 15:27:57 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java Tue Mar 31 11:55:55 2020 +0200 @@ -79,7 +79,7 @@ }); flowExecutor.start(); events.forEach(flowExecutor::handle); - flowExecutor.join(); + flowExecutor.join(2_000); Dns dnsResponse = res.getValue(); assertNotNull(dnsResponse);