changeset 1085:351208f87e5d

OperationEvent, FlowContext.eventQueue
author Devel 2
date Wed, 06 May 2020 15:34:24 +0200
parents 415b366668f6
children 969427aad169
files stress-tester/src/main/java/com/passus/st/client/FlowContext.java stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java stress-tester/src/main/java/com/passus/st/client/FlowUtils.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/client/OperationEvent.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/WaitForResponseEvent.java stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlFilter.java stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/test/java/com/passus/st/client/EventTest.java
diffstat 14 files changed, 226 insertions(+), 138 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Wed May 06 08:42:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Wed May 06 15:34:24 2020 +0200
@@ -6,10 +6,7 @@
 import com.passus.st.emitter.ChannelContext;
 import com.passus.st.emitter.SessionInfo;
 
-import java.util.Collection;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -40,6 +37,8 @@
 
     protected SessionPayloadEvent sentEvent;
 
+    protected Object receivedResponse;
+
     protected byte state = STATE_NEW;
 
     protected long connectionTime = -1;
@@ -78,7 +77,7 @@
     private Map<String, Object> params;
 
     public FlowContext(SessionInfo session) {
-        this(session, null);
+        this(session, new LinkedList<>());
     }
 
     public FlowContext(SessionInfo session, Deque<Event> eventQeueu) {
@@ -210,6 +209,14 @@
         return null;
     }
 
+    public <T> T getReceivedResponse() {
+        return (T) receivedResponse;
+    }
+
+    public void setReceivedResponse(Object receivedResponse) {
+        this.receivedResponse = receivedResponse;
+    }
+
     @Deprecated
     public DataDecoder decoder() {
         return decoder;
@@ -248,47 +255,35 @@
     }
 
     public void queueAddFirst(Event[] events) {
-        if (eventQeueu != null) {
-            for (Event event : events) {
-                eventQeueu.addFirst(event);
-            }
+        for (Event event : events) {
+            eventQeueu.addFirst(event);
         }
     }
 
     public void queueAddFirst(Collection<Event> events) {
-        if (eventQeueu != null) {
-            for (Event event : events) {
-                eventQeueu.addFirst(event);
-            }
+        for (Event event : events) {
+            eventQeueu.addFirst(event);
         }
     }
 
     public void queueAddFirst(Event event) {
-        if (eventQeueu != null) {
-            eventQeueu.addFirst(event);
-        }
+        eventQeueu.addFirst(event);
     }
 
     public void queueAddLast(Event[] events) {
-        if (eventQeueu != null) {
-            for (Event event : events) {
-                eventQeueu.addLast(event);
-            }
+        for (Event event : events) {
+            eventQeueu.addLast(event);
         }
     }
 
     public void queueAddLast(Collection<Event> events) {
-        if (eventQeueu != null) {
-            for (Event event : events) {
-                eventQeueu.addLast(event);
-            }
+        for (Event event : events) {
+            eventQeueu.addLast(event);
         }
     }
 
     public void queueAddLast(Event event) {
-        if (eventQeueu != null) {
-            eventQeueu.addLast(event);
-        }
+        eventQeueu.addLast(event);
     }
 
     public Object getParam(String name) {
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Wed May 06 08:42:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Wed May 06 15:34:24 2020 +0200
@@ -250,8 +250,7 @@
             flowContext.flowHandler.onDisconnected(flowContext);
         } finally {
             flowContext.clear();
-            flowContext.signal();
-            flowContext.unlock();
+            flowContext.signalAndUnlock();
         }
     }
 
@@ -273,82 +272,6 @@
         }
     }
 
-    private void responseReceived0(FlowContext flowContext, Object response) {
-        supervisor.onResponseReceived(flowContext, response);
-        flowContext.flowHandler.onResponseReceived(response, flowContext);
-        flowContext.sentEvent(null);
-        flowContext.receivedStartTimestamp(-1);
-    }
-
-    @Override
-    public void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
-        FlowContext flowContext = context.getFlowContext();
-        flowContext.lock();
-        try {
-            try {
-                FlowHandler client = flowContext.client();
-                FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext);
-                decoder.decode(data, flowContext);
-
-                long now = System.currentTimeMillis();
-                if (flowContext.receivedStartTimestamp() == -1) {
-                    flowContext.receivedStartTimestamp(now);
-                }
-
-                if (decoder.state() == DataDecoder.STATE_ERROR) {
-                    if (collectMetric) {
-                        synchronized (metric) {
-                            metric.incErrorNum();
-                        }
-                    }
-
-                    if (logger.isDebugEnabled()) {
-                        debug(logger, flowContext, "Decoder error. " + decoder.getLastError());
-                    }
-
-                    decoder.clear(flowContext);
-                    responseReceived0(flowContext, null);
-                } else if (decoder.state() == DataDecoder.STATE_FINISHED) {
-                    if (collectMetric) {
-                        synchronized (metric) {
-                            metric.incResponsesNum();
-                            metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp());
-                        }
-                    }
-
-                    Object resp = decoder.getResult();
-                    if (logger.isDebugEnabled()) {
-                        debug(logger, flowContext, "Response decoded. Response class " + resp.getClass().getSimpleName() + ".");
-                    }
-
-                    Object req = null;
-                    if (flowContext.sentEvent() != null) {
-                        req = flowContext.sentEvent().getRequest();
-                    }
-
-                    decoder.clear(flowContext);
-                    if (filterChain.filterInbound(req, resp, flowContext) != Filter.DENY) {
-                        responseReceived0(flowContext, resp);
-                    }
-                }
-            } catch (Exception e) {
-                if (collectMetric) {
-                    synchronized (metric) {
-                        metric.incErrorNum();
-                    }
-                }
-
-                if (logger.isDebugEnabled()) {
-                    debug(logger, flowContext, e.getMessage(), e);
-                }
-
-                error(flowContext, FlowError.unknownError());
-            }
-        } finally {
-            flowContext.signalAndUnlock();
-        }
-    }
-
     @Override
     public void dataWriteStart(ChannelContext context) {
         FlowContext flowContext = context.getFlowContext();
@@ -430,6 +353,7 @@
                 }
 
                 try {
+                    flowContext.receivedResponse = null;
                     flowContext.sentEvent = event;
                     flowContext.writeStartTime = -1;
                     flowContext.writeEndTime = -1;
@@ -461,4 +385,82 @@
             flowContext.signalAndUnlock();
         }
     }
+
+    private void responseReceived0(FlowContext flowContext, Object response, boolean blocked) {
+        flowContext.receivedResponse = response;
+        supervisor.onResponseReceived(flowContext, response);
+        if (!blocked) {
+            flowContext.flowHandler.onResponseReceived(response, flowContext);
+        }
+        flowContext.sentEvent = null;
+        flowContext.receivedStartTime = -1;
+    }
+
+    @Override
+    public void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
+        FlowContext flowContext = context.getFlowContext();
+        flowContext.lock();
+        try {
+            try {
+                FlowHandler client = flowContext.client();
+                FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext);
+                decoder.decode(data, flowContext);
+
+                long now = System.currentTimeMillis();
+                if (flowContext.receivedStartTimestamp() == -1) {
+                    flowContext.receivedStartTimestamp(now);
+                }
+
+                if (decoder.state() == DataDecoder.STATE_ERROR) {
+                    if (collectMetric) {
+                        synchronized (metric) {
+                            metric.incErrorNum();
+                        }
+                    }
+
+                    if (logger.isDebugEnabled()) {
+                        debug(logger, flowContext, "Decoder error. " + decoder.getLastError());
+                    }
+
+                    decoder.clear(flowContext);
+                    responseReceived0(flowContext, null, false);
+                } else if (decoder.state() == DataDecoder.STATE_FINISHED) {
+                    if (collectMetric) {
+                        synchronized (metric) {
+                            metric.incResponsesNum();
+                            metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp());
+                        }
+                    }
+
+                    Object resp = decoder.getResult();
+                    if (logger.isDebugEnabled()) {
+                        debug(logger, flowContext, "Response decoded. Response class " + resp.getClass().getSimpleName() + ".");
+                    }
+
+                    Object req = null;
+                    if (flowContext.sentEvent() != null) {
+                        req = flowContext.sentEvent().getRequest();
+                    }
+
+                    decoder.clear(flowContext);
+                    boolean blocked = filterChain.filterInbound(req, resp, flowContext) != Filter.DENY;
+                    responseReceived0(flowContext, resp, blocked);
+                }
+            } catch (Exception e) {
+                if (collectMetric) {
+                    synchronized (metric) {
+                        metric.incErrorNum();
+                    }
+                }
+
+                if (logger.isDebugEnabled()) {
+                    debug(logger, flowContext, e.getMessage(), e);
+                }
+
+                error(flowContext, FlowError.unknownError());
+            }
+        } finally {
+            flowContext.signalAndUnlock();
+        }
+    }
 }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java	Wed May 06 08:42:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java	Wed May 06 15:34:24 2020 +0200
@@ -56,7 +56,7 @@
 
     public static boolean waitForResponse(FlowContext flowContext, long timeout) throws InterruptedException {
         long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeout);
-        while (flowContext.sentEvent != null && !flowContext.isError()) {
+        while (flowContext.receivedResponse == null && !flowContext.isError()) {
             if (timeNanos <= 0) {
                 return false;
             }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Wed May 06 08:42:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Wed May 06 15:34:24 2020 +0200
@@ -148,7 +148,7 @@
     }
 
     protected FlowContext createFlowContext(SessionInfo session) {
-        FlowContext flowContext = new FlowContext(session, null);
+        FlowContext flowContext = new FlowContext(session);
         flowContext.createLock();
         return flowContext;
     }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/OperationEvent.java	Wed May 06 15:34:24 2020 +0200
@@ -0,0 +1,37 @@
+package com.passus.st.client;
+
+import java.io.IOException;
+
+public abstract class OperationEvent implements Event {
+
+    public static final int TYPE = 13;
+
+    private long timestamp;
+
+    public OperationEvent() {
+        timestamp = System.currentTimeMillis();
+    }
+
+    @Override
+    public int getType() {
+        return TYPE;
+    }
+
+    @Override
+    public String getSourceName() {
+        return null;
+    }
+
+    @Override
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public abstract void execute() throws IOException;
+
+}
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Wed May 06 08:42:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Wed May 06 15:34:24 2020 +0200
@@ -112,7 +112,7 @@
             return null;
         }
 
-        FlowContext flowContext = new FlowContext(session, eventsQueue);
+        FlowContext flowContext = new FlowContext(session);
         flowContext.createLock();
         FlowHandler client = clientFactory.create(session.getProtocolId());
         client.init(flowContext);
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Wed May 06 08:42:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Wed May 06 15:34:24 2020 +0200
@@ -10,6 +10,8 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -110,7 +112,7 @@
             return null;
         }
 
-        FlowContext flowContext = new FlowContext(session, eventsQueue);
+        FlowContext flowContext = new FlowContext(session, new LinkedList<>());
         flowContext.createLock();
         FlowHandler client = clientFactory.create(session.getProtocolId());
         client.init(flowContext);
@@ -202,6 +204,7 @@
             if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) {
                 flowContext = registerAndConnect(statusEvent.getSessionInfo(), wait);
                 flowContext.sessionEstablishedSeen = true;
+                processFlowEvents(flowContext);
             } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) {
                 flowContext = flowContext(statusEvent);
                 if (flowContext != null) {
@@ -213,6 +216,21 @@
         }
     }
 
+    protected void processFlowEvents(FlowContext flowContext) {
+        while (!flowContext.eventQeueu.isEmpty()) {
+            Event event = flowContext.eventQeueu.removeFirst();
+            if (event.getType() == SessionPayloadEvent.TYPE) {
+                flowProcessor.send(flowContext, (SessionPayloadEvent) event, true);
+            } else if (event.getType() == OperationEvent.TYPE) {
+                try {
+                    ((OperationEvent) event).execute();
+                } catch (IOException e) {
+                    errorInternal(flowContext, e);
+                }
+            }
+        }
+    }
+
     protected void sleep(Event event) {
         if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) {
             if (lastEventTimestamp != -1) {
@@ -243,6 +261,7 @@
             if (event.getType() == SessionStatusEvent.TYPE) {
                 SessionStatusEvent statusEvent = (SessionStatusEvent) sessEvent;
                 processFlowSessionStatusEvent(statusEvent, true);
+
                 return;
             } else if (event.getType() == SessionPayloadEvent.TYPE) {
                 FlowContext flowContext = flowContext(sessEvent);
@@ -261,7 +280,8 @@
                 }
 
                 if (flowContext != null && flowContext.state == STATE_CONNECTED) {
-                    flowProcessor.send(flowContext, (SessionPayloadEvent) event, true);
+                    flowContext.queueAddLast(event);
+                    processFlowEvents(flowContext);
                 }
             }
         } else if (event.getType() == DataEvents.DataLoopEnd.TYPE) {
@@ -333,7 +353,7 @@
 
         @Override
         public void onResponseReceived(FlowContext flowContext, Object response) {
-            fireResponseReceived(flowContext.sentEvent.getRequest(), response, flowContext);
+            fireResponseReceived(flowContext.sentRequest(), response, flowContext);
         }
     }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/WaitForResponseEvent.java	Wed May 06 15:34:24 2020 +0200
@@ -0,0 +1,42 @@
+package com.passus.st.client;
+
+import java.io.IOException;
+
+import static com.passus.st.client.FlowUtils.waitForResponse;
+
+public final class WaitForResponseEvent extends OperationEvent {
+
+    public static int TYPE = 13;
+
+    private final FlowContext flowContext;
+
+    private final boolean closeOnTimeout;
+
+    public WaitForResponseEvent(FlowContext flowContext) {
+        this(flowContext, true);
+    }
+
+    public WaitForResponseEvent(FlowContext flowContext, boolean closeOnTimeout) {
+        this.flowContext = flowContext;
+        this.closeOnTimeout = closeOnTimeout;
+    }
+
+    @Override
+    public void execute() throws IOException {
+        flowContext.lock();
+        try {
+            boolean ok = true;
+            try {
+                ok = waitForResponse(flowContext);
+            } catch (InterruptedException ignore) {
+
+            }
+
+            if (!ok && closeOnTimeout) {
+                flowContext.channelContext().close();
+            }
+        } finally {
+            flowContext.signalAndUnlock();
+        }
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java	Wed May 06 08:42:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java	Wed May 06 15:34:24 2020 +0200
@@ -9,15 +9,12 @@
 import com.passus.net.mysql.MySqlQueryCommand;
 import com.passus.st.client.AbstractFlowHandler;
 import com.passus.st.client.FlowContext;
+import com.passus.st.client.WaitForResponseEvent;
 import com.passus.st.client.pgsql.PgSqlFlowHandler;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.io.IOException;
-
 import static com.passus.st.Protocols.NETFLOW;
-import static com.passus.st.client.FlowUtils.lockAndWaitForResponse;
-import static com.passus.st.client.FlowUtils.waitForResponse;
 
 public class MySqlFlowHandler extends AbstractFlowHandler<MySqlMetric, MySqlPacket, MySqlPacket> implements TimeAware {
 
@@ -41,13 +38,7 @@
 
     @Override
     public void onConnected(FlowContext flowContext) {
-        try {
-            //Waiting for greeting
-             waitForResponse(flowContext);
-             //Thread.sleep(5_000);
-        } catch (InterruptedException ignore) {
-
-        }
+        flowContext.queueAddFirst(new WaitForResponseEvent(flowContext));
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlFilter.java	Wed May 06 08:42:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlFilter.java	Wed May 06 15:34:24 2020 +0200
@@ -8,7 +8,7 @@
 
     @Override
     public int filterInbound(Object req, Object resp, FlowContext context) {
-        if (resp instanceof MySqlPacket) {
+        if (req instanceof MySqlPacket || resp instanceof MySqlPacket) {
             return filterInbound((MySqlPacket) req, (MySqlPacket) resp, context);
         }
         return DUNNO;
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java	Wed May 06 08:42:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java	Wed May 06 15:34:24 2020 +0200
@@ -39,6 +39,7 @@
     public MySqlLoginFilter instanceForWorker(int index) {
         MySqlLoginFilter worker = new MySqlLoginFilter();
         worker.credentialsProvider = credentialsProvider;
+        worker.defaultAuthPlugin = defaultAuthPlugin;
         return worker;
     }
 
@@ -83,6 +84,7 @@
 
             loginReq.setUsername(credentials.getUser());
             loginReq.setAuthPlugin(authPlugin);
+            loginReq.setAttributes(null);
         }
     }
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Wed May 06 08:42:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Wed May 06 15:34:24 2020 +0200
@@ -144,8 +144,8 @@
                 logger.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress());
             }
 
+            setOpRead(key);
             keyContext.handler.channelActive(keyContext.channelContext);
-            setOpRead(key);
         } catch (Exception ex) {
             logger.error(ex.getMessage(), ex);
         }
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Wed May 06 08:42:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Wed May 06 15:34:24 2020 +0200
@@ -106,9 +106,8 @@
             return;
         }
 
-
         if (logger.isDebugEnabled()) {
-            logger.debug("Readed {}B ({} -> {})", buffer.readableBytes(),
+            logger.debug("Read {}B ({} -> {})", totalRead,
                     channelContext.getLocalAddress(), channelContext.getRemoteAddress());
         }
 
--- a/stress-tester/src/test/java/com/passus/st/client/EventTest.java	Wed May 06 08:42:28 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/EventTest.java	Wed May 06 15:34:24 2020 +0200
@@ -2,24 +2,22 @@
 
 import com.passus.commons.Loader;
 import com.passus.commons.Resolver;
-import java.lang.reflect.Constructor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
-import java.lang.reflect.Parameter;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
-import org.testng.Assert;
-import org.testng.annotations.Test;
 
 /**
- *
  * @author mikolaj.podbielski
  */
 public class EventTest {
 
     @Test
-    public void testGetTypeIsUnique() throws ReflectiveOperationException {
+    public void testGetTypeIsUnique() throws Exception {
         Set<Class<?>> classes = find();
         Map<Class, Integer> types = getEventTypes(classes);
         Map<Integer, Class> reverseMap = new LinkedHashMap<>();
@@ -30,7 +28,6 @@
                         + e.getKey().getName() + " " + oldKey.getName());
             }
         }
-
     }
 
     public static void main(String[] args) throws Exception {
@@ -41,14 +38,17 @@
         }
     }
 
-    private static Map<Class, Integer> getEventTypes(Set<Class<?>> classes) throws ReflectiveOperationException {
+    private static Map<Class, Integer> getEventTypes(Set<Class<?>> classes) throws Exception {
         Map<Class, Integer> types = new LinkedHashMap<>(classes.size());
         for (final Class<?> clazz : classes) {
-            Field field = clazz.getDeclaredField("TYPE");
-            if (field.isAccessible()) {
-                types.put(clazz, (Integer) field.get(null));
+            try {
+                Field field = clazz.getDeclaredField("TYPE");
+                if (field.isAccessible()) {
+                    types.put(clazz, (Integer) field.get(null));
+                }
+            } catch (Exception e) {
+                throw new Exception("Unable to process " + clazz.getName() + ". " + e.getMessage(), e);
             }
-
         }
         return types;
     }