changeset 1021:7a4efb7ff5b4

FlowWorkerBase refactorization in progress
author Devel 2
date Mon, 30 Mar 2020 13:41:01 +0200
parents 4678815d3051
children fa7ecf793f0c
files stress-tester/src/main/java/com/passus/st/PcapReporter.java stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/FlowContext.java stress-tester/src/main/java/com/passus/st/client/FlowError.java stress-tester/src/main/java/com/passus/st/client/FlowUtils.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java
diffstat 10 files changed, 187 insertions(+), 70 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/PcapReporter.java	Thu Mar 26 13:27:52 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/PcapReporter.java	Mon Mar 30 13:41:01 2020 +0200
@@ -237,7 +237,7 @@
                     } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
                         currFlowContext = flowContext((SessionEvent) event);
                         if (currFlowContext != null) {
-                            if (currFlowContext.state() != FlowContext.STATE_REQ_SENT) {
+                            if (!currFlowContext.isEventSent()) {
                                 SessionInfo session = statusEvent.getSessionInfo();
                                 close(session);
                                 currFlowContext = null;
--- a/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java	Thu Mar 26 13:27:52 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java	Mon Mar 30 13:41:01 2020 +0200
@@ -80,7 +80,7 @@
             do {
                 wait = false;
                 for (FlowContext flowContext : sessions.values()) {
-                    if (flowContext.state() == FlowContext.STATE_REQ_SENT) {
+                    if (flowContext.isEventSent()) {
                         wait = true;
                         break;
                     }
@@ -233,7 +233,7 @@
                 } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
                     FlowContext flowContext = flowContext(statusEvent);
                     if (flowContext != null) {
-                        if (flowContext.state() != FlowContext.STATE_REQ_SENT) {
+                        if (!flowContext.isEventSent()) {
                             disconnect(statusEvent);
                         }
                     }
@@ -247,12 +247,11 @@
                 if (flowContext != null) {
                     switch (flowContext.state()) {
                         case FlowContext.STATE_CONNECTING:
-                        case FlowContext.STATE_REQ_SENT:
                             return false;
                         case FlowContext.STATE_CONNECTED:
-                        case FlowContext.STATE_RESP_RECEIVED:
-                        case FlowContext.STATE_ERROR:
-                            if (send(flowContext, (SessionPayloadEvent) event)) {
+                            if(flowContext.isEventSent()) {
+                                return false;
+                            } else if (send(flowContext, (SessionPayloadEvent) event)) {
                                 return true;
                             }
                         case FlowContext.STATE_DISCONNECTING:
@@ -274,8 +273,7 @@
                 SessionEvent sessEvent = event;
                 FlowContext flowContext = flowContext(sessEvent);
                 if (flowContext != null) {
-                    return (flowContext.state() == FlowContext.STATE_RESP_RECEIVED
-                            || flowContext.state() >= FlowContext.STATE_DISCONNECTING);
+                    return (flowContext.state() >= FlowContext.STATE_DISCONNECTING);
                 }
 
                 return true;
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Thu Mar 26 13:27:52 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Mon Mar 30 13:41:01 2020 +0200
@@ -11,13 +11,13 @@
 
 public class FlowContext {
 
-    public static final int STATE_ERROR = -1;
-    public static final int STATE_CONNECTING = 0;
-    public static final int STATE_CONNECTED = 1;
-    public static final int STATE_REQ_SENT = 2;
-    public static final int STATE_RESP_RECEIVED = 3;
-    public static final int STATE_DISCONNECTING = 4;
-    public static final int STATE_DISCONNECTED = 5;
+    public static final byte STATE_ERROR = -1;
+    public static final byte STATE_CONNECTING = 0;
+    public static final byte STATE_CONNECTED = 1;
+    /*    public static final byte STATE_REQ_SENT = 2;
+        public static final byte STATE_RESP_RECEIVED = 3;*/
+    public static final byte STATE_DISCONNECTING = 4;
+    public static final byte STATE_DISCONNECTED = 5;
 
     public static final int INIT_BUFFER_CAPACITY = 1024;
 
@@ -33,13 +33,13 @@
 
     protected SessionPayloadEvent sentEvent;
 
-    protected int state = STATE_CONNECTING;
+    protected byte state = STATE_CONNECTING;
 
     protected long timeout = -1;
 
-    protected long receivedStartTimestamp = -1;
+    protected long receivedStartTime = -1;
 
-    protected long sendStartTimestamp = -1;
+    protected long sendStartTime = -1;
 
     private int loop;
 
@@ -55,6 +55,8 @@
 
     protected int sendErrors;
 
+    protected FlowError lastError;
+
     @Deprecated
     protected DataDecoder decoder;
 
@@ -88,11 +90,11 @@
         this.client = client;
     }
 
-    public void state(int state) {
+    public void state(byte state) {
         this.state = state;
     }
 
-    public int state() {
+    public byte state() {
         return state;
     }
 
@@ -105,25 +107,29 @@
     }
 
     public long receivedStartTimestamp() {
-        return receivedStartTimestamp;
+        return receivedStartTime;
     }
 
     public void receivedStartTimestamp(long receivedStartTimestamp) {
-        this.receivedStartTimestamp = receivedStartTimestamp;
+        this.receivedStartTime = receivedStartTimestamp;
     }
 
     public long sendStartTimestamp() {
-        return sendStartTimestamp;
+        return sendStartTime;
     }
 
     public void sendStartTimestamp(long sendStartTimestamp) {
-        this.sendStartTimestamp = sendStartTimestamp;
+        this.sendStartTime = sendStartTimestamp;
     }
 
     public SessionPayloadEvent sentEvent() {
         return sentEvent;
     }
 
+    public boolean isEventSent() {
+        return (sentEvent != null);
+    }
+
     public void sentEvent(SessionPayloadEvent sentEvent) {
         this.sentEvent = sentEvent;
     }
@@ -169,6 +175,10 @@
         return blocked;
     }
 
+    public FlowError getLastError() {
+        return lastError;
+    }
+
     public Object getParam(String name) {
         if (params == null) {
             return null;
@@ -235,10 +245,10 @@
                 return "connecting";
             case STATE_CONNECTED:
                 return "connected";
-            case STATE_REQ_SENT:
+/*            case STATE_REQ_SENT:
                 return "req_sent";
             case STATE_RESP_RECEIVED:
-                return "resp_received";
+                return "resp_received";*/
             case STATE_DISCONNECTING:
                 return "disconnecting";
             case STATE_DISCONNECTED:
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowError.java	Mon Mar 30 13:41:01 2020 +0200
@@ -0,0 +1,115 @@
+package com.passus.st.client;
+
+import javax.net.ssl.SSLException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.ConnectException;
+import java.net.SocketException;
+
+public class FlowError {
+
+    public static final int CODE_UNKNOWN_ERROR = 0;
+    public static final int CODE_CONNECTION_ERROR = 1;
+    public static final int CODE_CONNECTION_REFUSED = 2;
+    public static final int CODE_CONNECTION_TIMEOUT = 3;
+    public static final int CODE_UNKNOWN_HOST = 4;
+    public static final int CODE_NO_ROUTE_TO_HOST = 5;
+    public static final int CODE_NETWORK_UNREACHABLE = 6;
+
+    public static final int CODE_READ_TIMEOUT = 7;
+    public static final int CODE_INVALID_RESPONSE = 8;
+    public static final int CODE_CONNECTION_CLOSED_UNEXPECTEDLY = 9;
+    public static final int CODE_CONNECTION_RESET_BY_PEER = 10;
+    public static final int CODE_SOCKET_ERROR = 11;
+    public static final int CODE_IO_ERROR = 12;
+
+    public static final int CODE_SSL_ERROR = 13;
+    public static final int CODE_SSL_UNRECOGNIZED_MESSAGE_ERROR = 14;
+    public static final int CODE_SSL_HANDSHAKE_ERROR = 15;
+
+    private final int code;
+
+    private final String message;
+
+    private final Exception cause;
+
+    public FlowError(int code, String message) {
+        this(code, message, null);
+    }
+
+    public FlowError(int code, String message, Exception cause) {
+        this.code = code;
+        this.message = message;
+        this.cause = cause;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public Exception getCause() {
+        return cause;
+    }
+
+    public static FlowError interpret(Exception ex, String stackTrace) {
+        if (stackTrace.contains("Connection refused")) {
+            return new FlowError(CODE_CONNECTION_REFUSED, "Connection refused.", ex);
+        } else if (stackTrace.contains("connect timed out")) {
+            return new FlowError(CODE_CONNECTION_TIMEOUT, "Connection timed out.", ex);
+        } else if (stackTrace.contains("Read timed out")) {
+            return new FlowError(CODE_READ_TIMEOUT, "Response read timed out.", ex);
+        } else if (stackTrace.contains("Invalid Http response")) {
+            return new FlowError(CODE_INVALID_RESPONSE, "Invalid response.", ex);
+        } else if (stackTrace.contains("UnknownHostException")) {
+            return new FlowError(CODE_UNKNOWN_HOST, "Unknown host. Unable to resolve host name.", ex);
+        } else if (stackTrace.contains("No route to host")) {
+            return new FlowError(CODE_NO_ROUTE_TO_HOST, "No route to host.", ex);
+        } else if (stackTrace.contains("Network is unreachable")) {
+            return new FlowError(CODE_NETWORK_UNREACHABLE, "Network is unreachable.", ex);
+        } else if (stackTrace.contains("Software caused connection abort")) {
+            return new FlowError(CODE_CONNECTION_CLOSED_UNEXPECTEDLY, "Connection closed unexpectedly.", ex);
+        } else if (stackTrace.contains("Connection reset by peer")) {
+            return new FlowError(CODE_CONNECTION_RESET_BY_PEER, "Connection reset by peer.", ex);
+        } else if (stackTrace.contains("Unrecognized SSL message")) {
+            return new FlowError(CODE_SSL_UNRECOGNIZED_MESSAGE_ERROR, "Unrecognized SSL message. Probably plaintext connection.", ex);
+        } else if (stackTrace.contains("SSLHandshakeException")) {
+            return new FlowError(CODE_SSL_HANDSHAKE_ERROR, "SSL handshake error.", ex);
+        }
+
+        Throwable cause = ex.getCause();
+        if (cause instanceof SSLException) {
+            return new FlowError(CODE_SSL_ERROR, "SSL error.", ex);
+        } else if (cause instanceof ConnectException) {
+            return new FlowError(CODE_CONNECTION_ERROR, "Connection error.", ex);
+        } else if (cause instanceof SocketException) {
+            return new FlowError(CODE_SOCKET_ERROR, "Networking socket error.", ex);
+        } else if (cause instanceof IOException) {
+            return new FlowError(CODE_IO_ERROR, "Networking IO error.", ex);
+        }
+
+        return new FlowError(CODE_UNKNOWN_ERROR, "Unknown error.", ex);
+    }
+
+    public static FlowError interpret(Exception ex) {
+        try {
+            String stackTrace;
+            try (StringWriter sw = new StringWriter();
+                 PrintWriter w = new PrintWriter(sw)) {
+                ex.printStackTrace(w);
+                stackTrace = sw.toString();
+            }
+
+            return interpret(ex, stackTrace);
+        } catch (IOException ignore) {
+
+        }
+
+        return new FlowError(CODE_UNKNOWN_ERROR, "Unknown error.", ex);
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java	Thu Mar 26 13:27:52 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java	Mon Mar 30 13:41:01 2020 +0200
@@ -12,14 +12,12 @@
 
 public class FlowUtils {
 
-    public static final Map<Integer, Long> DEFAULT_TIMEOUTS;
+    public static final Map<Byte, Long> DEFAULT_TIMEOUTS;
 
     static {
-        Map<Integer, Long> defaultTimeouts = new HashMap<>();
+        Map<Byte, Long> defaultTimeouts = new HashMap<>();
         defaultTimeouts.put(FlowContext.STATE_CONNECTING, 10_000L);
         defaultTimeouts.put(FlowContext.STATE_CONNECTED, 60_000L);
-        defaultTimeouts.put(FlowContext.STATE_REQ_SENT, 30_000L);
-        defaultTimeouts.put(FlowContext.STATE_RESP_RECEIVED, 60_000L);
         defaultTimeouts.put(FlowContext.STATE_ERROR, Long.MAX_VALUE);
         defaultTimeouts.put(FlowContext.STATE_DISCONNECTING, 2_000L);
         defaultTimeouts.put(STATE_DISCONNECTED, 0L);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Thu Mar 26 13:27:52 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Mon Mar 30 13:41:01 2020 +0200
@@ -9,8 +9,8 @@
 import com.passus.st.emitter.Emitter;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.metric.MetricsContainer;
-import it.unimi.dsi.fastutil.ints.Int2LongArrayMap;
-import it.unimi.dsi.fastutil.ints.Int2LongMap;
+import it.unimi.dsi.fastutil.bytes.Byte2LongArrayMap;
+import it.unimi.dsi.fastutil.bytes.Byte2LongMap;
 
 import java.io.IOException;
 import java.util.HashSet;
@@ -29,7 +29,7 @@
 
     private final Set<SessionInfo> blockedSessions = new HashSet<>();
 
-    private final Int2LongMap timeouts = new Int2LongArrayMap();
+    private final Byte2LongMap timeouts = new Byte2LongArrayMap();
 
     protected final Object lock = new Object();
 
@@ -264,17 +264,29 @@
         updateFlowState(flowContext, STATE_ERROR);
     }
 
-    private void responseReceived(FlowContext flowContext) {
+    private void requestSent0(FlowContext flowContext) {
+        requestSent(flowContext);
+    }
+
+    protected void requestSent(FlowContext flowContext) {
+
+    }
+
+    private void responseReceived0(FlowContext flowContext) {
+        responseReceived(flowContext);
         flowContext.sentEvent(null);
         flowContext.receivedStartTimestamp(-1);
-        updateFlowState(flowContext, STATE_RESP_RECEIVED);
     }
 
-    private void updateFlowState(FlowContext flowContext, int newState) {
+    protected void responseReceived(FlowContext flowContext) {
+
+    }
+
+    private void updateFlowState(FlowContext flowContext, byte newState) {
         updateFlowState(flowContext, newState, flowContext.state);
     }
 
-    private void updateFlowState(FlowContext flowContext, int newState, int oldState) {
+    private void updateFlowState(FlowContext flowContext, byte newState, byte oldState) {
         if (logger.isDebugEnabled()) {
             debug(flowContext, "Flow status changing {} -> {}.",
                     contextStateToString(flowContext.state()),
@@ -430,7 +442,7 @@
                     }
 
                     decoder.clear(flowContext);
-                    responseReceived(flowContext);
+                    responseReceived0(flowContext);
                 } else if (decoder.state() == DataDecoder.STATE_FINISHED) {
                     if (collectMetric) {
                         synchronized (metric) {
@@ -454,7 +466,7 @@
                     }
 
                     decoder.clear(flowContext);
-                    responseReceived(flowContext);
+                    responseReceived0(flowContext);
                 }
             } catch (Exception e) {
                 if (collectMetric) {
@@ -498,7 +510,7 @@
 
                 flowContext.client().onDataWriteEnd(flowContext);
                 if (!flowContext.isBidirectional()) {
-                    responseReceived(flowContext);
+                    responseReceived0(flowContext);
                 }
             }
 
@@ -576,9 +588,9 @@
                 }
 
                 try {
-                    updateFlowState(flowContext, FlowContext.STATE_REQ_SENT);
                     flowContext.sentEvent(event);
                     flowContext.channelContext().writeAndFlush(buffer);
+                    requestSent0(flowContext);
                     buffer.clear();
                     return true;
                 } catch (Exception e) {
@@ -612,17 +624,8 @@
                             switch (flowContext.state()) {
                                 case FlowContext.STATE_CONNECTING:
                                 case FlowContext.STATE_CONNECTED:
-                                case FlowContext.STATE_REQ_SENT:
-                                case FlowContext.STATE_ERROR:
                                     disconnect(flowContext);
                                     break;
-                                case FlowContext.STATE_RESP_RECEIVED:
-                                    //Dziwny blad nie powinien wystepowac
-                                    break;
-                                case FlowContext.STATE_DISCONNECTING:
-                                case STATE_DISCONNECTED:
-                                    removeFlowContext(flowContext);
-                                    break;
                             }
                         }
                     }
@@ -645,5 +648,4 @@
         }
     }
 
-
 }
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Thu Mar 26 13:27:52 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Mon Mar 30 13:41:01 2020 +0200
@@ -109,7 +109,7 @@
             while (it.hasNext()) {
                 LocalFlowContext indexFlowContext = it.next();
                 if (indexFlowContext.eventsQueue.isEmpty()
-                        && indexFlowContext.state() != FlowContext.STATE_REQ_SENT) {
+                        && !indexFlowContext.isEventSent()) {
                     disconnect(flowContext);
                     if (--diff == 0) {
                         break;
@@ -122,25 +122,21 @@
     }
 
     private boolean canSend(FlowContext flowContext) {
-        int state = flowContext.state();
-        return (state == FlowContext.STATE_CONNECTED
-                || state == FlowContext.STATE_RESP_RECEIVED
-                || state == FlowContext.STATE_ERROR
-                || state == FlowContext.STATE_REQ_SENT);
+        return flowContext.state() == FlowContext.STATE_CONNECTED && !flowContext.isEventSent();
     }
 
     @Override
     protected void flowStateChanged(FlowContext flowContext, int oldState) {
         LocalFlowContext localFlowContext = (LocalFlowContext) flowContext;
-        if (oldState == FlowContext.STATE_REQ_SENT) {
+        /*if (oldState == FlowContext.STATE_REQ_SENT) {
             if (semaphore.availablePermits() <= maxSentRequests) {
                 semaphore.release();
             }
-        }
+        }*/
 
         if (closeAllConnections) {
             if (localFlowContext.state() < FlowContext.STATE_DISCONNECTING
-                    && localFlowContext.state() != FlowContext.STATE_REQ_SENT
+                    && !localFlowContext.isEventSent()
                     && localFlowContext.eventsQueue.isEmpty()) {
                 disconnect(flowContext);
                 return;
@@ -149,7 +145,7 @@
 
         if (localFlowContext.state() >= FlowContext.STATE_CONNECTED
                 && localFlowContext.state() < FlowContext.STATE_DISCONNECTING
-                && localFlowContext.state() != FlowContext.STATE_REQ_SENT
+                && !localFlowContext.isEventSent()
                 && !localFlowContext.eventsQueue.isEmpty()) {
 
             Event event = localFlowContext.eventsQueue.peek();
@@ -216,7 +212,7 @@
                         LocalFlowContext flowContext = flowContext((SessionEvent) event);
                         if (flowContext != null) {
                             if (flowContext.eventsQueue.isEmpty()
-                                    && flowContext.state() != FlowContext.STATE_REQ_SENT) {
+                                    && !flowContext.isEventSent()) {
                                 disconnect(statusEvent);
                             } else {
                                 addToQueue(flowContext, event);
@@ -233,7 +229,7 @@
                             if (flowContext.eventsQueue.isEmpty()
                                     && (flowContext.state() == FlowContext.STATE_CONNECTED
                                     || flowContext.state() == FlowContext.STATE_ERROR
-                                    || flowContext.state() == FlowContext.STATE_RESP_RECEIVED)) {
+                                    || flowContext.isEventSent())) {
                                 send(flowContext, payloadEvent);
                             } else {
                                 addToQueue(flowContext, event);
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Thu Mar 26 13:27:52 2020 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Mon Mar 30 13:41:01 2020 +0200
@@ -54,7 +54,6 @@
 
         if (context == currFlowContext) {
             if (context.state() == FlowContext.STATE_CONNECTED
-                    || context.state() == FlowContext.STATE_RESP_RECEIVED
                     || context.state() == FlowContext.STATE_ERROR
                     || context.state() == FlowContext.STATE_DISCONNECTED) {
                 currFlowContext = null;
@@ -83,7 +82,7 @@
             do {
                 wait = false;
                 for (FlowContext flowContext : sessions.values()) {
-                    if (flowContext.state() == FlowContext.STATE_REQ_SENT) {
+                    if (flowContext.isEventSent()) {
                         wait = true;
                         break;
                     }
@@ -143,7 +142,7 @@
                     } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
                         currFlowContext = flowContext((SessionEvent) event);
                         if (currFlowContext != null) {
-                            if (currFlowContext.state() != FlowContext.STATE_REQ_SENT) {
+                            if (!currFlowContext.isEventSent()) {
                                 disconnect(statusEvent);
                             }
                         }
@@ -159,7 +158,6 @@
                     if (flowContext != null) {
                         switch (flowContext.state()) {
                             case FlowContext.STATE_CONNECTED:
-                            case FlowContext.STATE_RESP_RECEIVED:
                             case FlowContext.STATE_ERROR:
                                 currFlowContext = flowContext;
                                 if (send(flowContext, (SessionPayloadEvent) event)) {
--- a/stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java	Thu Mar 26 13:27:52 2020 +0100
+++ b/stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java	Mon Mar 30 13:41:01 2020 +0200
@@ -58,7 +58,7 @@
 
             events.forEach(worker::handle);
 
-            worker.join();
+            worker.join(2_000);
             assertTrue(listner.size() > 0);
             assertTrue(listner.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent);
             TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listner.get(0);
--- a/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java	Thu Mar 26 13:27:52 2020 +0100
+++ b/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java	Mon Mar 30 13:41:01 2020 +0200
@@ -53,7 +53,7 @@
         try {
             flowExecutor.start();
             events.forEach(flowExecutor::handle);
-            flowExecutor.join();
+            flowExecutor.join(2_000);
 
             assertEquals(0, listener.getErrors());
             assertEquals(2, listener.getReceived().size());