changeset 1059:1d47e67bad78

FlowContext - events queue manipulation + PgSql processing enhanced
author Devel 2
date Thu, 16 Apr 2020 12:52:37 +0200
parents 8535ce392b5e
children efe41a7d7272
files stress-tester/src/main/java/com/passus/st/client/FlowContext.java stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/http/filter/HttpFlowUtils.java stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java stress-tester/src/main/java/com/passus/st/client/pgsql/filter/PgSqlFilter.java stress-tester/src/main/java/com/passus/st/client/pgsql/filter/PgSqlLoginFilter.java stress-tester/src/main/java/com/passus/st/emitter/StatelessTasks.java stress-tester/src/main/java/com/passus/st/emitter/Task.java stress-tester/src/main/java/com/passus/st/job/TestJob.java
diffstat 12 files changed, 173 insertions(+), 58 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Thu Apr 16 09:41:15 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Thu Apr 16 12:52:37 2020 +0200
@@ -6,6 +6,8 @@
 import com.passus.st.emitter.ChannelContext;
 import com.passus.st.emitter.SessionInfo;
 
+import java.util.Collection;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.locks.Condition;
@@ -26,6 +28,8 @@
 
     protected final SessionInfo session;
 
+    protected final Deque<Event> eventQeueu;
+
     protected volatile boolean blocked;
 
     protected volatile boolean sessionEstablishedSeen;
@@ -74,7 +78,12 @@
     private Map<String, Object> params;
 
     public FlowContext(SessionInfo session) {
+        this(session, null);
+    }
+
+    public FlowContext(SessionInfo session, Deque<Event> eventQeueu) {
         this.session = session;
+        this.eventQeueu = eventQeueu;
     }
 
     public void createLock() {
@@ -147,6 +156,10 @@
         return timeout;
     }
 
+    public void block() {
+        blocked = true;
+    }
+
     public boolean isBlocked() {
         return blocked;
     }
@@ -177,16 +190,24 @@
         this.writeStartTime = sendStartTimestamp;
     }
 
+    public boolean isEventSent() {
+        return (sentEvent != null);
+    }
+
     public SessionPayloadEvent sentEvent() {
         return sentEvent;
     }
 
-    public boolean isEventSent() {
-        return (sentEvent != null);
+    public void sentEvent(SessionPayloadEvent sentEvent) {
+        this.sentEvent = sentEvent;
     }
 
-    public void sentEvent(SessionPayloadEvent sentEvent) {
-        this.sentEvent = sentEvent;
+    public Object sentRequest() {
+        if (sentEvent != null) {
+            return sentEvent.getRequest();
+        }
+
+        return null;
     }
 
     @Deprecated
@@ -226,6 +247,50 @@
         return session;
     }
 
+    public void queueAddFirst(Event[] events) {
+        if (eventQeueu != null) {
+            for (Event event : events) {
+                eventQeueu.addFirst(event);
+            }
+        }
+    }
+
+    public void queueAddFirst(Collection<Event> events) {
+        if (eventQeueu != null) {
+            for (Event event : events) {
+                eventQeueu.addFirst(event);
+            }
+        }
+    }
+
+    public void queueAddFirst(Event event) {
+        if (eventQeueu != null) {
+            eventQeueu.addFirst(event);
+        }
+    }
+
+    public void queueAddLast(Event[] events) {
+        if (eventQeueu != null) {
+            for (Event event : events) {
+                eventQeueu.addLast(event);
+            }
+        }
+    }
+
+    public void queueAddLast(Collection<Event> events) {
+        if (eventQeueu != null) {
+            for (Event event : events) {
+                eventQeueu.addLast(event);
+            }
+        }
+    }
+
+    public void queueAddLast(Event event) {
+        if (eventQeueu != null) {
+            eventQeueu.addLast(event);
+        }
+    }
+
     public Object getParam(String name) {
         if (params == null) {
             return null;
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Thu Apr 16 09:41:15 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Thu Apr 16 12:52:37 2020 +0200
@@ -402,6 +402,9 @@
                 buffer = flowContext.buffer();
                 try {
                     encoder.encode(req, flowContext, buffer);
+                    if (trace) {
+                        logger.trace("Request encoded: " + req);
+                    }
                 } catch (Exception e) {
                     flowContext.encoderErrors++;
                     if (logger.isDebugEnabled()) {
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Thu Apr 16 09:41:15 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Thu Apr 16 12:52:37 2020 +0200
@@ -148,7 +148,7 @@
     }
 
     protected FlowContext createFlowContext(SessionInfo session) {
-        FlowContext flowContext = new FlowContext(session);
+        FlowContext flowContext = new FlowContext(session, null);
         flowContext.createLock();
         return flowContext;
     }
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Thu Apr 16 09:41:15 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java	Thu Apr 16 12:52:37 2020 +0200
@@ -112,7 +112,7 @@
             return null;
         }
 
-        FlowContext flowContext = new FlowContext(session);
+        FlowContext flowContext = new FlowContext(session, eventsQueue);
         flowContext.createLock();
         FlowHandler client = clientFactory.create(session.getProtocolId());
         client.init(flowContext);
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Thu Apr 16 09:41:15 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Thu Apr 16 12:52:37 2020 +0200
@@ -44,8 +44,6 @@
 
     public SynchFlowWorker(Emitter emitter, String name, int index) {
         super(emitter, name, index);
-        SynchWrapper supervisor = new SynchWrapper(this);
-        flowProcessor = new FlowProcessor(supervisor, logger, index);
     }
 
     @Override
@@ -89,7 +87,7 @@
             return null;
         }
 
-        FlowContext flowContext = new FlowContext(session);
+        FlowContext flowContext = new FlowContext(session, eventsQueue);
         flowContext.createLock();
 
         FlowHandler client = clientFactory.create(session.getProtocolId());
@@ -263,6 +261,9 @@
     @Override
     public void run() {
         working = true;
+        SynchWrapper supervisor = new SynchWrapper(this);
+        flowProcessor = new FlowProcessor(supervisor, logger, index);
+
         while (working) {
             Event event = null;
             try {
--- a/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpFlowUtils.java	Thu Apr 16 09:41:15 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpFlowUtils.java	Thu Apr 16 12:52:37 2020 +0200
@@ -20,7 +20,7 @@
     }
 
     public static FlowContext createFlowContext(SessionInfo session, HttpScopes scopes) {
-        FlowContext context = new FlowContext(session);
+        FlowContext context = new FlowContext(session, null);
         context.setParam(HttpFlowConst.PARAM_HTTP_CONTEXT, new HttpFlowContext(context, scopes));
         return context;
     }
--- a/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java	Thu Apr 16 09:41:15 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java	Thu Apr 16 12:52:37 2020 +0200
@@ -3,16 +3,23 @@
 import com.passus.commons.Assert;
 import com.passus.commons.time.TimeAware;
 import com.passus.commons.time.TimeGenerator;
+import com.passus.net.pgsql.PgSqlErrorResponseMessage;
 import com.passus.net.pgsql.PgSqlMessage;
 import com.passus.net.pgsql.PgSqlMessageType;
 import com.passus.net.pgsql.PgSqlSimpleQueryMessage;
 import com.passus.st.client.AbstractFlowHandler;
 import com.passus.st.client.FlowContext;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
 
 import static com.passus.st.Protocols.NETFLOW;
 
 public class PgSqlFlowHandlerNetflowFlowHandler extends AbstractFlowHandler<PgSqlMetric, PgSqlMessage, PgSqlMessage> implements TimeAware {
 
+    private final Logger LOGGER = LogManager.getLogger(PgSqlFlowHandlerNetflowFlowHandler.class);
+
     TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
 
     public PgSqlFlowHandlerNetflowFlowHandler() {
@@ -30,10 +37,10 @@
     }
 
     @Override
-    protected void onRequestSent0(PgSqlMessage msg, FlowContext flowContext) {
+    protected void onRequestSent0(PgSqlMessage req, FlowContext flowContext) {
         if (collectMetrics) {
-            if (msg.getType() == PgSqlMessageType.SIMPLE_QUERY) {
-                PgSqlSimpleQueryMessage simpleQueryMsg = (PgSqlSimpleQueryMessage) msg;
+            if (req.getType() == PgSqlMessageType.SIMPLE_QUERY) {
+                PgSqlSimpleQueryMessage simpleQueryMsg = (PgSqlSimpleQueryMessage) req;
                 synchronized (metric) {
                     metric.addQuery(simpleQueryMsg.getQuery());
                 }
@@ -41,9 +48,36 @@
         }
     }
 
+    private void disconnectAndBlock(FlowContext flowContext) {
+        try {
+            flowContext.block();
+            flowContext.channelContext().close();
+        } catch (IOException ignore) {
+
+        } finally {
+
+        }
+    }
+
     @Override
-    protected void onResponseReceived0(PgSqlMessage msg, FlowContext flowContext) {
-        //flowContext.channelContext().close();
+    protected void onResponseReceived0(PgSqlMessage resp, FlowContext flowContext) {
+        if (resp.getType() == PgSqlMessageType.ERROR_RESPONSE) {
+            PgSqlErrorResponseMessage errorMsg = (PgSqlErrorResponseMessage) resp;
+            PgSqlMessage req = (PgSqlMessage) flowContext.sentRequest();
+            if (req.getType() == PgSqlMessageType.STARTUP_MESSAGE) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("PgSql auth failed. Server message " + errorMsg);
+                }
+
+                disconnectAndBlock(flowContext);
+            } else if(errorMsg.isConnectionExceptionClass()) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("Fatal error. Server message " + errorMsg);
+                }
+
+                disconnectAndBlock(flowContext);
+            }
+        }
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/client/pgsql/filter/PgSqlFilter.java	Thu Apr 16 09:41:15 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/filter/PgSqlFilter.java	Thu Apr 16 12:52:37 2020 +0200
@@ -5,7 +5,6 @@
 import com.passus.st.filter.FlowFilter;
 
 /**
- *
  * @author mikolaj.podbielski
  */
 public abstract class PgSqlFilter implements FlowFilter {
@@ -24,13 +23,13 @@
 
     @Override
     public int filterOutbound(Object req, Object resp, FlowContext context) {
-        if (req instanceof PgSqlMessage) {
-            return filterOutbound((PgSqlMessage) req, resp, context);
+        if (req instanceof PgSqlMessage || resp instanceof PgSqlMessage) {
+            return filterOutbound((PgSqlMessage) req, (PgSqlMessage) resp, context);
         }
         return DUNNO;
     }
 
-    public int filterOutbound(PgSqlMessage req, Object resp, FlowContext context) {
+    public int filterOutbound(PgSqlMessage req, PgSqlMessage resp, FlowContext context) {
         return DUNNO;
     }
 }
--- a/stress-tester/src/main/java/com/passus/st/client/pgsql/filter/PgSqlLoginFilter.java	Thu Apr 16 09:41:15 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/filter/PgSqlLoginFilter.java	Thu Apr 16 12:52:37 2020 +0200
@@ -4,29 +4,25 @@
 import com.passus.config.Configuration;
 import com.passus.config.ConfigurationContext;
 import com.passus.config.annotations.NodeDefinitionCreate;
-import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef;
-import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef;
-import com.passus.config.schema.MapNodeDefinition;
 import com.passus.config.schema.NodeDefinition;
 import com.passus.config.schema.NodeDefinitionCreator;
-import com.passus.net.pgsql.PgSqlAuthRequestMessage;
-import com.passus.net.pgsql.PgSqlAuthRequestMessageMD5;
-import com.passus.net.pgsql.PgSqlMessage;
-import com.passus.net.pgsql.PgSqlPasswordMessage;
+import com.passus.net.pgsql.*;
 import com.passus.st.client.FlowContext;
 import com.passus.st.client.credentials.Credentials;
 import com.passus.st.client.credentials.CredentialsProvider;
 import com.passus.st.filter.CredentialsProviderNodeDefinition;
 import com.passus.st.plugin.PluginConstants;
-import static com.passus.st.utils.HashUtils.HEX_L;
-import static com.passus.st.utils.HashUtils.getMD5;
-import java.nio.charset.StandardCharsets;
-import java.security.MessageDigest;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+
+import static com.passus.config.schema.ConfigurationSchemaBuilder.*;
+import static com.passus.st.utils.HashUtils.HEX_L;
+import static com.passus.st.utils.HashUtils.getMD5;
+
 /**
- *
  * @author mikolaj.podbielski
  */
 @NodeDefinitionCreate(PgSqlLoginFilter.PgSqlLoginFilterNodeDefinitionCreator.class)
@@ -39,21 +35,25 @@
 
     protected CredentialsProvider credentialsProvider;
 
+    protected String dbName;
+
     @Override
     public PgSqlLoginFilter instanceForWorker(int index) {
         PgSqlLoginFilter worker = new PgSqlLoginFilter();
         worker.credentialsProvider = credentialsProvider;
+        worker.dbName = dbName;
         return worker;
     }
 
     @Override
     public void configure(Configuration config, ConfigurationContext context) {
-        credentialsProvider = config.get("provider", null);
+        credentialsProvider = (CredentialsProvider) config.get("provider");
+        dbName = (String) config.get("dbName");
     }
 
     @Override
     public int filterInbound(Object req, PgSqlMessage resp, FlowContext context) {
-        if (resp instanceof PgSqlAuthRequestMessage) {
+        if (resp.getType() == PgSqlMessageType.AUTH_REQUEST) {
             PgSqlAuthRequestMessage authReq = (PgSqlAuthRequestMessage) resp;
             context.setParam("authReq", authReq);
         }
@@ -61,30 +61,40 @@
     }
 
     @Override
-    public int filterOutbound(PgSqlMessage req, Object resp, FlowContext context) {
-        if (req instanceof PgSqlPasswordMessage) {
+    public int filterOutbound(PgSqlMessage req, PgSqlMessage resp, FlowContext context) {
+        if (req.getType() == PgSqlMessageType.STARTUP_MESSAGE) {
+            PgSqlStartupMessage startupMsg = (PgSqlStartupMessage) req;
+            Credentials credentials = credentialsProvider.getCredentials(context);
+            if (dbName != null) {
+                startupMsg.setParameter(PgSqlStartupMessage.PARAM_DATABASE, dbName);
+            }
+
+            startupMsg.setParameter(PgSqlStartupMessage.PARAM_USER, credentials.getUser());
+            context.setParam("authCredentials", credentials);
+        } else if (req.getType() == PgSqlMessageType.PASSWORD_MESSAGE) {
             PgSqlPasswordMessage passwordReq = (PgSqlPasswordMessage) req;
             PgSqlAuthRequestMessage authReq = context.getParamValue("authReq");
-            PgSqlAuthRequestMessage.AuthType authType = authReq.getAuthType();
-
-            Credentials credentials = credentialsProvider.getCredentials(context);
-            String password = credentials.getPassword();
-            String user = credentials.getUser();
+            Credentials credentials = context.getParamValue("authCredentials");
+            if (authReq != null && credentials != null) {
+                PgSqlAuthRequestMessage.AuthType authType = authReq.getAuthType();
+                String password = credentials.getPassword();
+                String user = credentials.getUser();
 
-            switch (authType) {
-                case PLAINTEXT:
-                    passwordReq.setPassword(password);
-                    break;
-                case MD5:
-                    PgSqlAuthRequestMessageMD5 authReqMD5 = (PgSqlAuthRequestMessageMD5) authReq;
-                    byte[] salt = authReqMD5.getSalt();
-                    password = hashMD5(user, password, salt);
-                    passwordReq.setPassword(password);
-                    break;
-                default:
-                    if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug("Unsupported auth type: " + authType);
-                    }
+                switch (authType) {
+                    case PLAINTEXT:
+                        passwordReq.setPassword(password);
+                        break;
+                    case MD5:
+                        PgSqlAuthRequestMessageMD5 authReqMD5 = (PgSqlAuthRequestMessageMD5) authReq;
+                        byte[] salt = authReqMD5.getSalt();
+                        password = hashMD5(user, password, salt);
+                        passwordReq.setPassword(password);
+                        break;
+                    default:
+                        if (LOGGER.isDebugEnabled()) {
+                            LOGGER.debug("Unsupported auth type: " + authType);
+                        }
+                }
             }
         }
         return DUNNO;
@@ -137,9 +147,10 @@
 
         @Override
         public NodeDefinition create() {
-            MapNodeDefinition mapDef = mapDef();
-            mapDef.add(tupleDef("provider", CredentialsProviderNodeDefinition.providersNodeDef()).setRequired(true));
-            return mapDef;
+            return mapDef(
+                    tupleDef("provider", CredentialsProviderNodeDefinition.providersNodeDef()).setRequired(true),
+                    tupleDef("dbName", valueDef()).setRequired(false)
+            );
         }
     }
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/StatelessTasks.java	Thu Apr 16 09:41:15 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/StatelessTasks.java	Thu Apr 16 12:52:37 2020 +0200
@@ -7,6 +7,7 @@
     public static final Task READ_TASK = new Task(Task.READ);
     public static final Task FLUSH_TASK = new Task(Task.FLUSH);
     public static final Task CLOSE_TASK = new Task(Task.CLOSE);
+    public static final Task CLOSE_AND_BLOCK_TASK = new Task(Task.CLOSE_AND_BLOCK_TASK);
 
     private StatelessTasks() {
     }
--- a/stress-tester/src/main/java/com/passus/st/emitter/Task.java	Thu Apr 16 09:41:15 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/Task.java	Thu Apr 16 12:52:37 2020 +0200
@@ -7,6 +7,7 @@
     public static final int READ = 3;
     public static final int FLUSH = 4;
     public static final int CLOSE = 5;
+    public static final int CLOSE_AND_BLOCK_TASK = 6;
 
     public final int code;
 
--- a/stress-tester/src/main/java/com/passus/st/job/TestJob.java	Thu Apr 16 09:41:15 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/job/TestJob.java	Thu Apr 16 12:52:37 2020 +0200
@@ -204,7 +204,7 @@
             }
 
             // TODO: czy dodać filtry top-level do klienta???
-            flowExecutor.addFilter(httpVarsFilter);
+            filters.forEach(flowExecutor::addFilter);
         }
 
     }