changeset 1083:6473e8d9693f

FlowHandler - onConnected, onDisconnected methods + MySql processing in progress
author Devel 2
date Tue, 05 May 2020 15:01:13 +0200
parents b09fd168aeab
children 415b366668f6
files stress-tester/src/main/java/com/passus/st/client/FlowContext.java stress-tester/src/main/java/com/passus/st/client/FlowHandler.java stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java stress-tester/src/main/java/com/passus/st/client/FlowUtils.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java stress-tester/src/main/java/com/passus/st/source/PcapMySqlListener.java
diffstat 9 files changed, 93 insertions(+), 31 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Mon May 04 14:47:48 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Tue May 05 15:01:13 2020 +0200
@@ -54,7 +54,7 @@
 
     protected int loop;
 
-    protected FlowHandler client;
+    protected FlowHandler flowHandler;
 
     protected boolean bidirectional = true;
 
@@ -125,11 +125,11 @@
     }
 
     public FlowHandler client() {
-        return client;
+        return flowHandler;
     }
 
     public void client(FlowHandler client) {
-        this.client = client;
+        this.flowHandler = client;
     }
 
     public void state(byte state) {
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandler.java	Mon May 04 14:47:48 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandler.java	Tue May 05 15:01:13 2020 +0200
@@ -20,6 +20,14 @@
 
     FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext);
 
+    default void onConnected(FlowContext flowContext) {
+
+    }
+
+    default void onDisconnected(FlowContext flowContext) {
+
+    }
+
     default void onDataWriteStart(FlowContext flowContext) {
 
     }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Mon May 04 14:47:48 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Tue May 05 15:01:13 2020 +0200
@@ -219,6 +219,8 @@
                         debug(logger, flowContext, "Error occurred during onConnected calling.", e);
                     }
                 }
+
+                flowContext.flowHandler.onConnected(flowContext);
             } catch (Exception ex) {
                 error(flowContext, ex);
             } finally {
@@ -244,6 +246,8 @@
                     debug(logger, flowContext, "Error occurred during onDisconnected calling.", e);
                 }
             }
+
+            flowContext.flowHandler.onDisconnected(flowContext);
         } finally {
             flowContext.clear();
             flowContext.signal();
@@ -271,7 +275,7 @@
 
     private void responseReceived0(FlowContext flowContext, Object response) {
         supervisor.onResponseReceived(flowContext, response);
-        flowContext.client.onResponseReceived(response, flowContext);
+        flowContext.flowHandler.onResponseReceived(response, flowContext);
         flowContext.sentEvent(null);
         flowContext.receivedStartTimestamp(-1);
     }
@@ -353,7 +357,7 @@
             if (flowContext.sentEvent() != null) {
                 flowContext.writeStartTime = System.currentTimeMillis();
                 flowContext.writeEndTime = -1;
-                flowContext.client.onDataWriteStart(flowContext);
+                flowContext.flowHandler.onDataWriteStart(flowContext);
             }
         } finally {
             flowContext.signalAndUnlock();
@@ -374,7 +378,7 @@
                 }
 
                 flowContext.writeEndTime = now;
-                flowContext.client.onDataWriteEnd(flowContext);
+                flowContext.flowHandler.onDataWriteEnd(flowContext);
             }
         } finally {
             flowContext.signalAndUnlock();
@@ -383,7 +387,7 @@
 
     private void requestSent0(FlowContext flowContext, SessionPayloadEvent event) {
         flowContext.sentEvent = event;
-        flowContext.client.onRequestSent(event.getRequest(), flowContext);
+        flowContext.flowHandler.onRequestSent(event.getRequest(), flowContext);
         supervisor.onRequestSent(flowContext, event);
     }
 
@@ -454,7 +458,7 @@
         } catch (Exception e) {
             error(flowContext, e);
         } finally {
-            flowContext.unlock();
+            flowContext.signalAndUnlock();
         }
     }
 }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java	Mon May 04 14:47:48 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java	Tue May 05 15:01:13 2020 +0200
@@ -41,6 +41,15 @@
         return true;
     }
 
+    public static boolean lockAndWaitForResponse(FlowContext flowContext) throws InterruptedException {
+        flowContext.lock();
+        try {
+            return waitForResponse(flowContext, Timeouts.DEFAULT_TIMEOUT);
+        } finally {
+            flowContext.signalAndUnlock();
+        }
+    }
+
     public static boolean waitForResponse(FlowContext flowContext) throws InterruptedException {
         return waitForResponse(flowContext, Timeouts.DEFAULT_TIMEOUT);
     }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Mon May 04 14:47:48 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Tue May 05 15:01:13 2020 +0200
@@ -567,7 +567,7 @@
             if (flowContext.sentEvent() != null) {
                 flowContext.writeStartTime = timeGenerator.currentTimeMillis();
                 flowContext.writeEndTime = -1;
-                flowContext.client.onDataWriteStart(flowContext);
+                flowContext.flowHandler.onDataWriteStart(flowContext);
             }
         } finally {
             flowContext.signalAndUnlock();
@@ -588,7 +588,7 @@
                 }
 
                 flowContext.writeEndTime = now;
-                flowContext.client.onDataWriteEnd(flowContext);
+                flowContext.flowHandler.onDataWriteEnd(flowContext);
             }
         } finally {
             flowContext.signalAndUnlock();
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Mon May 04 14:47:48 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java	Tue May 05 15:01:13 2020 +0200
@@ -85,8 +85,8 @@
         super.writeMetrics(container);
 
         sessions.forEach((s, f) -> {
-            if (f.client != null) {
-                f.client.writeMetrics(container);
+            if (f.flowHandler != null) {
+                f.flowHandler.writeMetrics(container);
             }
         });
 
@@ -325,7 +325,7 @@
         @Override
         public void onDisconnected(FlowContext flowContext) {
             if (collectMetrics) {
-                flowContext.client.writeMetrics(deferredMetrics);
+                flowContext.flowHandler.writeMetrics(deferredMetrics);
             }
 
             sessions.remove(flowContext.session);
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java	Mon May 04 14:47:48 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java	Tue May 05 15:01:13 2020 +0200
@@ -13,7 +13,11 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
+
 import static com.passus.st.Protocols.NETFLOW;
+import static com.passus.st.client.FlowUtils.lockAndWaitForResponse;
+import static com.passus.st.client.FlowUtils.waitForResponse;
 
 public class MySqlFlowHandler extends AbstractFlowHandler<MySqlMetric, MySqlPacket, MySqlPacket> implements TimeAware {
 
@@ -36,6 +40,17 @@
     }
 
     @Override
+    public void onConnected(FlowContext flowContext) {
+        try {
+            //Waiting for greeting
+             waitForResponse(flowContext);
+             //Thread.sleep(5_000);
+        } catch (InterruptedException ignore) {
+
+        }
+    }
+
+    @Override
     protected void onRequestSent0(MySqlPacket req, FlowContext flowContext) {
         if (collectMetrics) {
             if (req.getPacketType() == MySqlPacketTypes.QUERY_COMMAND) {
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java	Mon May 04 14:47:48 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java	Tue May 05 15:01:13 2020 +0200
@@ -13,14 +13,13 @@
 import com.passus.st.client.FlowContext;
 import com.passus.st.client.credentials.Credentials;
 import com.passus.st.client.credentials.CredentialsProvider;
-import com.passus.st.filter.CredentialsProviderNodeDefinition;
 import com.passus.st.plugin.PluginConstants;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef;
-import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef;
+import static com.passus.config.schema.ConfigurationSchemaBuilder.*;
 import static com.passus.net.mysql.MySqlUtils.passScramble411;
+import static com.passus.st.filter.CredentialsProviderNodeDefinition.providersNodeDef;
 
 @NodeDefinitionCreate(MySqlLoginFilter.MySqlLoginFilterNodeDefinitionCreator.class)
 @Plugin(name = MySqlLoginFilter.TYPE, category = PluginConstants.CATEGORY_FLOW_FILTER)
@@ -28,10 +27,14 @@
 
     private static final Logger LOGGER = LogManager.getLogger(MySqlLoginFilter.class);
 
+    private static final String DEFAULT_AUTH_PLUGIN = "mysql_native_password";
+
     public static final String TYPE = "mySqlLogin";
 
     protected CredentialsProvider credentialsProvider;
 
+    private String defaultAuthPlugin = DEFAULT_AUTH_PLUGIN;
+
     @Override
     public MySqlLoginFilter instanceForWorker(int index) {
         MySqlLoginFilter worker = new MySqlLoginFilter();
@@ -42,32 +45,53 @@
     @Override
     public void configure(Configuration config, ConfigurationContext context) {
         credentialsProvider = (CredentialsProvider) config.get("provider");
+        defaultAuthPlugin = config.getString("defaultAuthPlugin", DEFAULT_AUTH_PLUGIN);
     }
 
     @Override
     public int filterInbound(MySqlPacket req, MySqlPacket resp, FlowContext context) {
         if (resp.getPacketType() == MySqlPacketTypes.GREETINGS) {
-            context.setParam("mysql.greetings", resp);
+            context.setParam("mysql.greeting", resp);
         }
 
         return DUNNO;
     }
 
+    private void modifyLoginReq(MySqlLoginRequest loginReq, MySqlServerGreetingResponse greeting, FlowContext context) {
+        Credentials credentials = credentialsProvider.getCredentials(context);
+        if (credentials != null) {
+            String seed = greeting.getSalt() + greeting.getAuthSalt();
+            String authPlugin = greeting.getAuthPlugin() == null ? defaultAuthPlugin : greeting.getAuthPlugin();
+            String pass = credentials.getPassword();
+
+            switch (authPlugin) {
+                case "mysql_native_password":
+                    byte[] passScramble = passScramble411(
+                            pass.getBytes(),
+                            seed.getBytes()
+                    );
+
+                    loginReq.setPassword(passScramble);
+                    break;
+                case "mysql_clear_password":
+                    loginReq.setPassword(pass.getBytes());
+                    break;
+                default:
+                    LOGGER.warn("Not supported MySql auth plugin '" + authPlugin + "'.");
+                    return;
+            }
+
+            loginReq.setUsername(credentials.getUser());
+            loginReq.setAuthPlugin(authPlugin);
+        }
+    }
+
     @Override
     public int filterOutbound(MySqlPacket req, MySqlPacket resp, FlowContext context) {
         if (req.getPacketType() == MySqlPacketTypes.LOGIN_REQUEST) {
-            MySqlServerGreetingResponse greetings = context.getParamValue("mysql.greetings");
-            if (greetings != null) {
-                MySqlLoginRequest loginReq = (MySqlLoginRequest) req;
-                Credentials credentials = credentialsProvider.getCredentials(context);
-                if (credentials != null) {
-                    byte[] passScramble = passScramble411(
-                            credentials.getPassword().getBytes(),
-                            greetings.getAuthSalt().getBytes()
-                    );
-                    loginReq.setUsername(credentials.getUser());
-                    loginReq.setPassword(passScramble);
-                }
+            MySqlServerGreetingResponse greeting = context.getParamValue("mysql.greeting");
+            if (greeting != null) {
+                modifyLoginReq((MySqlLoginRequest) req, greeting, context);
             }
         }
 
@@ -79,9 +103,11 @@
         @Override
         public NodeDefinition create() {
             return mapDef(
-                    tupleDef("provider", CredentialsProviderNodeDefinition.providersNodeDef()).setRequired(true)
+                    tupleDef("provider", providersNodeDef()).setRequired(true),
+                    tupleDef("defaultAuthPlugin", valueDef()).setRequired(false)
             );
         }
+
     }
 
 }
--- a/stress-tester/src/main/java/com/passus/st/source/PcapMySqlListener.java	Mon May 04 14:47:48 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapMySqlListener.java	Tue May 05 15:01:13 2020 +0200
@@ -16,7 +16,7 @@
     @Override
     public void onMessageReceived(SessionContext context, MySqlPacket msg, long timestamp) {
         //Nie przetwarzamy odpowiedzi
-        if (!msg.isRequest()) {
+        if (msg.isRequest()) {
             firePayloadEvent(msg, null, context, timestamp);
         }
     }