Mercurial > stress-tester
changeset 1021:7a4efb7ff5b4
FlowWorkerBase refactorization in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/PcapReporter.java Thu Mar 26 13:27:52 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/PcapReporter.java Mon Mar 30 13:41:01 2020 +0200 @@ -237,7 +237,7 @@ } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { currFlowContext = flowContext((SessionEvent) event); if (currFlowContext != null) { - if (currFlowContext.state() != FlowContext.STATE_REQ_SENT) { + if (!currFlowContext.isEventSent()) { SessionInfo session = statusEvent.getSessionInfo(); close(session); currFlowContext = null;
--- a/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java Thu Mar 26 13:27:52 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java Mon Mar 30 13:41:01 2020 +0200 @@ -80,7 +80,7 @@ do { wait = false; for (FlowContext flowContext : sessions.values()) { - if (flowContext.state() == FlowContext.STATE_REQ_SENT) { + if (flowContext.isEventSent()) { wait = true; break; } @@ -233,7 +233,7 @@ } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { FlowContext flowContext = flowContext(statusEvent); if (flowContext != null) { - if (flowContext.state() != FlowContext.STATE_REQ_SENT) { + if (!flowContext.isEventSent()) { disconnect(statusEvent); } } @@ -247,12 +247,11 @@ if (flowContext != null) { switch (flowContext.state()) { case FlowContext.STATE_CONNECTING: - case FlowContext.STATE_REQ_SENT: return false; case FlowContext.STATE_CONNECTED: - case FlowContext.STATE_RESP_RECEIVED: - case FlowContext.STATE_ERROR: - if (send(flowContext, (SessionPayloadEvent) event)) { + if(flowContext.isEventSent()) { + return false; + } else if (send(flowContext, (SessionPayloadEvent) event)) { return true; } case FlowContext.STATE_DISCONNECTING: @@ -274,8 +273,7 @@ SessionEvent sessEvent = event; FlowContext flowContext = flowContext(sessEvent); if (flowContext != null) { - return (flowContext.state() == FlowContext.STATE_RESP_RECEIVED - || flowContext.state() >= FlowContext.STATE_DISCONNECTING); + return (flowContext.state() >= FlowContext.STATE_DISCONNECTING); } return true;
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Thu Mar 26 13:27:52 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Mon Mar 30 13:41:01 2020 +0200 @@ -11,13 +11,13 @@ public class FlowContext { - public static final int STATE_ERROR = -1; - public static final int STATE_CONNECTING = 0; - public static final int STATE_CONNECTED = 1; - public static final int STATE_REQ_SENT = 2; - public static final int STATE_RESP_RECEIVED = 3; - public static final int STATE_DISCONNECTING = 4; - public static final int STATE_DISCONNECTED = 5; + public static final byte STATE_ERROR = -1; + public static final byte STATE_CONNECTING = 0; + public static final byte STATE_CONNECTED = 1; + /* public static final byte STATE_REQ_SENT = 2; + public static final byte STATE_RESP_RECEIVED = 3;*/ + public static final byte STATE_DISCONNECTING = 4; + public static final byte STATE_DISCONNECTED = 5; public static final int INIT_BUFFER_CAPACITY = 1024; @@ -33,13 +33,13 @@ protected SessionPayloadEvent sentEvent; - protected int state = STATE_CONNECTING; + protected byte state = STATE_CONNECTING; protected long timeout = -1; - protected long receivedStartTimestamp = -1; + protected long receivedStartTime = -1; - protected long sendStartTimestamp = -1; + protected long sendStartTime = -1; private int loop; @@ -55,6 +55,8 @@ protected int sendErrors; + protected FlowError lastError; + @Deprecated protected DataDecoder decoder; @@ -88,11 +90,11 @@ this.client = client; } - public void state(int state) { + public void state(byte state) { this.state = state; } - public int state() { + public byte state() { return state; } @@ -105,25 +107,29 @@ } public long receivedStartTimestamp() { - return receivedStartTimestamp; + return receivedStartTime; } public void receivedStartTimestamp(long receivedStartTimestamp) { - this.receivedStartTimestamp = receivedStartTimestamp; + this.receivedStartTime = receivedStartTimestamp; } public long sendStartTimestamp() { - return sendStartTimestamp; + return sendStartTime; } public void sendStartTimestamp(long sendStartTimestamp) { - this.sendStartTimestamp = sendStartTimestamp; + this.sendStartTime = sendStartTimestamp; } public SessionPayloadEvent sentEvent() { return sentEvent; } + public boolean isEventSent() { + return (sentEvent != null); + } + public void sentEvent(SessionPayloadEvent sentEvent) { this.sentEvent = sentEvent; } @@ -169,6 +175,10 @@ return blocked; } + public FlowError getLastError() { + return lastError; + } + public Object getParam(String name) { if (params == null) { return null; @@ -235,10 +245,10 @@ return "connecting"; case STATE_CONNECTED: return "connected"; - case STATE_REQ_SENT: +/* case STATE_REQ_SENT: return "req_sent"; case STATE_RESP_RECEIVED: - return "resp_received"; + return "resp_received";*/ case STATE_DISCONNECTING: return "disconnecting"; case STATE_DISCONNECTED:
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowError.java Mon Mar 30 13:41:01 2020 +0200 @@ -0,0 +1,115 @@ +package com.passus.st.client; + +import javax.net.ssl.SSLException; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.ConnectException; +import java.net.SocketException; + +public class FlowError { + + public static final int CODE_UNKNOWN_ERROR = 0; + public static final int CODE_CONNECTION_ERROR = 1; + public static final int CODE_CONNECTION_REFUSED = 2; + public static final int CODE_CONNECTION_TIMEOUT = 3; + public static final int CODE_UNKNOWN_HOST = 4; + public static final int CODE_NO_ROUTE_TO_HOST = 5; + public static final int CODE_NETWORK_UNREACHABLE = 6; + + public static final int CODE_READ_TIMEOUT = 7; + public static final int CODE_INVALID_RESPONSE = 8; + public static final int CODE_CONNECTION_CLOSED_UNEXPECTEDLY = 9; + public static final int CODE_CONNECTION_RESET_BY_PEER = 10; + public static final int CODE_SOCKET_ERROR = 11; + public static final int CODE_IO_ERROR = 12; + + public static final int CODE_SSL_ERROR = 13; + public static final int CODE_SSL_UNRECOGNIZED_MESSAGE_ERROR = 14; + public static final int CODE_SSL_HANDSHAKE_ERROR = 15; + + private final int code; + + private final String message; + + private final Exception cause; + + public FlowError(int code, String message) { + this(code, message, null); + } + + public FlowError(int code, String message, Exception cause) { + this.code = code; + this.message = message; + this.cause = cause; + } + + public int getCode() { + return code; + } + + public String getMessage() { + return message; + } + + public Exception getCause() { + return cause; + } + + public static FlowError interpret(Exception ex, String stackTrace) { + if (stackTrace.contains("Connection refused")) { + return new FlowError(CODE_CONNECTION_REFUSED, "Connection refused.", ex); + } else if (stackTrace.contains("connect timed out")) { + return new FlowError(CODE_CONNECTION_TIMEOUT, "Connection timed out.", ex); + } else if (stackTrace.contains("Read timed out")) { + return new FlowError(CODE_READ_TIMEOUT, "Response read timed out.", ex); + } else if (stackTrace.contains("Invalid Http response")) { + return new FlowError(CODE_INVALID_RESPONSE, "Invalid response.", ex); + } else if (stackTrace.contains("UnknownHostException")) { + return new FlowError(CODE_UNKNOWN_HOST, "Unknown host. Unable to resolve host name.", ex); + } else if (stackTrace.contains("No route to host")) { + return new FlowError(CODE_NO_ROUTE_TO_HOST, "No route to host.", ex); + } else if (stackTrace.contains("Network is unreachable")) { + return new FlowError(CODE_NETWORK_UNREACHABLE, "Network is unreachable.", ex); + } else if (stackTrace.contains("Software caused connection abort")) { + return new FlowError(CODE_CONNECTION_CLOSED_UNEXPECTEDLY, "Connection closed unexpectedly.", ex); + } else if (stackTrace.contains("Connection reset by peer")) { + return new FlowError(CODE_CONNECTION_RESET_BY_PEER, "Connection reset by peer.", ex); + } else if (stackTrace.contains("Unrecognized SSL message")) { + return new FlowError(CODE_SSL_UNRECOGNIZED_MESSAGE_ERROR, "Unrecognized SSL message. Probably plaintext connection.", ex); + } else if (stackTrace.contains("SSLHandshakeException")) { + return new FlowError(CODE_SSL_HANDSHAKE_ERROR, "SSL handshake error.", ex); + } + + Throwable cause = ex.getCause(); + if (cause instanceof SSLException) { + return new FlowError(CODE_SSL_ERROR, "SSL error.", ex); + } else if (cause instanceof ConnectException) { + return new FlowError(CODE_CONNECTION_ERROR, "Connection error.", ex); + } else if (cause instanceof SocketException) { + return new FlowError(CODE_SOCKET_ERROR, "Networking socket error.", ex); + } else if (cause instanceof IOException) { + return new FlowError(CODE_IO_ERROR, "Networking IO error.", ex); + } + + return new FlowError(CODE_UNKNOWN_ERROR, "Unknown error.", ex); + } + + public static FlowError interpret(Exception ex) { + try { + String stackTrace; + try (StringWriter sw = new StringWriter(); + PrintWriter w = new PrintWriter(sw)) { + ex.printStackTrace(w); + stackTrace = sw.toString(); + } + + return interpret(ex, stackTrace); + } catch (IOException ignore) { + + } + + return new FlowError(CODE_UNKNOWN_ERROR, "Unknown error.", ex); + } + +}
--- a/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java Thu Mar 26 13:27:52 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java Mon Mar 30 13:41:01 2020 +0200 @@ -12,14 +12,12 @@ public class FlowUtils { - public static final Map<Integer, Long> DEFAULT_TIMEOUTS; + public static final Map<Byte, Long> DEFAULT_TIMEOUTS; static { - Map<Integer, Long> defaultTimeouts = new HashMap<>(); + Map<Byte, Long> defaultTimeouts = new HashMap<>(); defaultTimeouts.put(FlowContext.STATE_CONNECTING, 10_000L); defaultTimeouts.put(FlowContext.STATE_CONNECTED, 60_000L); - defaultTimeouts.put(FlowContext.STATE_REQ_SENT, 30_000L); - defaultTimeouts.put(FlowContext.STATE_RESP_RECEIVED, 60_000L); defaultTimeouts.put(FlowContext.STATE_ERROR, Long.MAX_VALUE); defaultTimeouts.put(FlowContext.STATE_DISCONNECTING, 2_000L); defaultTimeouts.put(STATE_DISCONNECTED, 0L);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Thu Mar 26 13:27:52 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Mon Mar 30 13:41:01 2020 +0200 @@ -9,8 +9,8 @@ import com.passus.st.emitter.Emitter; import com.passus.st.emitter.SessionInfo; import com.passus.st.metric.MetricsContainer; -import it.unimi.dsi.fastutil.ints.Int2LongArrayMap; -import it.unimi.dsi.fastutil.ints.Int2LongMap; +import it.unimi.dsi.fastutil.bytes.Byte2LongArrayMap; +import it.unimi.dsi.fastutil.bytes.Byte2LongMap; import java.io.IOException; import java.util.HashSet; @@ -29,7 +29,7 @@ private final Set<SessionInfo> blockedSessions = new HashSet<>(); - private final Int2LongMap timeouts = new Int2LongArrayMap(); + private final Byte2LongMap timeouts = new Byte2LongArrayMap(); protected final Object lock = new Object(); @@ -264,17 +264,29 @@ updateFlowState(flowContext, STATE_ERROR); } - private void responseReceived(FlowContext flowContext) { + private void requestSent0(FlowContext flowContext) { + requestSent(flowContext); + } + + protected void requestSent(FlowContext flowContext) { + + } + + private void responseReceived0(FlowContext flowContext) { + responseReceived(flowContext); flowContext.sentEvent(null); flowContext.receivedStartTimestamp(-1); - updateFlowState(flowContext, STATE_RESP_RECEIVED); } - private void updateFlowState(FlowContext flowContext, int newState) { + protected void responseReceived(FlowContext flowContext) { + + } + + private void updateFlowState(FlowContext flowContext, byte newState) { updateFlowState(flowContext, newState, flowContext.state); } - private void updateFlowState(FlowContext flowContext, int newState, int oldState) { + private void updateFlowState(FlowContext flowContext, byte newState, byte oldState) { if (logger.isDebugEnabled()) { debug(flowContext, "Flow status changing {} -> {}.", contextStateToString(flowContext.state()), @@ -430,7 +442,7 @@ } decoder.clear(flowContext); - responseReceived(flowContext); + responseReceived0(flowContext); } else if (decoder.state() == DataDecoder.STATE_FINISHED) { if (collectMetric) { synchronized (metric) { @@ -454,7 +466,7 @@ } decoder.clear(flowContext); - responseReceived(flowContext); + responseReceived0(flowContext); } } catch (Exception e) { if (collectMetric) { @@ -498,7 +510,7 @@ flowContext.client().onDataWriteEnd(flowContext); if (!flowContext.isBidirectional()) { - responseReceived(flowContext); + responseReceived0(flowContext); } } @@ -576,9 +588,9 @@ } try { - updateFlowState(flowContext, FlowContext.STATE_REQ_SENT); flowContext.sentEvent(event); flowContext.channelContext().writeAndFlush(buffer); + requestSent0(flowContext); buffer.clear(); return true; } catch (Exception e) { @@ -612,17 +624,8 @@ switch (flowContext.state()) { case FlowContext.STATE_CONNECTING: case FlowContext.STATE_CONNECTED: - case FlowContext.STATE_REQ_SENT: - case FlowContext.STATE_ERROR: disconnect(flowContext); break; - case FlowContext.STATE_RESP_RECEIVED: - //Dziwny blad nie powinien wystepowac - break; - case FlowContext.STATE_DISCONNECTING: - case STATE_DISCONNECTED: - removeFlowContext(flowContext); - break; } } } @@ -645,5 +648,4 @@ } } - }
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Thu Mar 26 13:27:52 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Mon Mar 30 13:41:01 2020 +0200 @@ -109,7 +109,7 @@ while (it.hasNext()) { LocalFlowContext indexFlowContext = it.next(); if (indexFlowContext.eventsQueue.isEmpty() - && indexFlowContext.state() != FlowContext.STATE_REQ_SENT) { + && !indexFlowContext.isEventSent()) { disconnect(flowContext); if (--diff == 0) { break; @@ -122,25 +122,21 @@ } private boolean canSend(FlowContext flowContext) { - int state = flowContext.state(); - return (state == FlowContext.STATE_CONNECTED - || state == FlowContext.STATE_RESP_RECEIVED - || state == FlowContext.STATE_ERROR - || state == FlowContext.STATE_REQ_SENT); + return flowContext.state() == FlowContext.STATE_CONNECTED && !flowContext.isEventSent(); } @Override protected void flowStateChanged(FlowContext flowContext, int oldState) { LocalFlowContext localFlowContext = (LocalFlowContext) flowContext; - if (oldState == FlowContext.STATE_REQ_SENT) { + /*if (oldState == FlowContext.STATE_REQ_SENT) { if (semaphore.availablePermits() <= maxSentRequests) { semaphore.release(); } - } + }*/ if (closeAllConnections) { if (localFlowContext.state() < FlowContext.STATE_DISCONNECTING - && localFlowContext.state() != FlowContext.STATE_REQ_SENT + && !localFlowContext.isEventSent() && localFlowContext.eventsQueue.isEmpty()) { disconnect(flowContext); return; @@ -149,7 +145,7 @@ if (localFlowContext.state() >= FlowContext.STATE_CONNECTED && localFlowContext.state() < FlowContext.STATE_DISCONNECTING - && localFlowContext.state() != FlowContext.STATE_REQ_SENT + && !localFlowContext.isEventSent() && !localFlowContext.eventsQueue.isEmpty()) { Event event = localFlowContext.eventsQueue.peek(); @@ -216,7 +212,7 @@ LocalFlowContext flowContext = flowContext((SessionEvent) event); if (flowContext != null) { if (flowContext.eventsQueue.isEmpty() - && flowContext.state() != FlowContext.STATE_REQ_SENT) { + && !flowContext.isEventSent()) { disconnect(statusEvent); } else { addToQueue(flowContext, event); @@ -233,7 +229,7 @@ if (flowContext.eventsQueue.isEmpty() && (flowContext.state() == FlowContext.STATE_CONNECTED || flowContext.state() == FlowContext.STATE_ERROR - || flowContext.state() == FlowContext.STATE_RESP_RECEIVED)) { + || flowContext.isEventSent())) { send(flowContext, payloadEvent); } else { addToQueue(flowContext, event);
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Thu Mar 26 13:27:52 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Mon Mar 30 13:41:01 2020 +0200 @@ -54,7 +54,6 @@ if (context == currFlowContext) { if (context.state() == FlowContext.STATE_CONNECTED - || context.state() == FlowContext.STATE_RESP_RECEIVED || context.state() == FlowContext.STATE_ERROR || context.state() == FlowContext.STATE_DISCONNECTED) { currFlowContext = null; @@ -83,7 +82,7 @@ do { wait = false; for (FlowContext flowContext : sessions.values()) { - if (flowContext.state() == FlowContext.STATE_REQ_SENT) { + if (flowContext.isEventSent()) { wait = true; break; } @@ -143,7 +142,7 @@ } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { currFlowContext = flowContext((SessionEvent) event); if (currFlowContext != null) { - if (currFlowContext.state() != FlowContext.STATE_REQ_SENT) { + if (!currFlowContext.isEventSent()) { disconnect(statusEvent); } } @@ -159,7 +158,6 @@ if (flowContext != null) { switch (flowContext.state()) { case FlowContext.STATE_CONNECTED: - case FlowContext.STATE_RESP_RECEIVED: case FlowContext.STATE_ERROR: currFlowContext = flowContext; if (send(flowContext, (SessionPayloadEvent) event)) {
--- a/stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java Thu Mar 26 13:27:52 2020 +0100 +++ b/stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java Mon Mar 30 13:41:01 2020 +0200 @@ -58,7 +58,7 @@ events.forEach(worker::handle); - worker.join(); + worker.join(2_000); assertTrue(listner.size() > 0); assertTrue(listner.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent); TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listner.get(0);
--- a/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java Thu Mar 26 13:27:52 2020 +0100 +++ b/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java Mon Mar 30 13:41:01 2020 +0200 @@ -53,7 +53,7 @@ try { flowExecutor.start(); events.forEach(flowExecutor::handle); - flowExecutor.join(); + flowExecutor.join(2_000); assertEquals(0, listener.getErrors()); assertEquals(2, listener.getReceived().size());