Mercurial > stress-tester
changeset 1083:6473e8d9693f
FlowHandler - onConnected, onDisconnected methods + MySql processing in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Mon May 04 14:47:48 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Tue May 05 15:01:13 2020 +0200 @@ -54,7 +54,7 @@ protected int loop; - protected FlowHandler client; + protected FlowHandler flowHandler; protected boolean bidirectional = true; @@ -125,11 +125,11 @@ } public FlowHandler client() { - return client; + return flowHandler; } public void client(FlowHandler client) { - this.client = client; + this.flowHandler = client; } public void state(byte state) {
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandler.java Mon May 04 14:47:48 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandler.java Tue May 05 15:01:13 2020 +0200 @@ -20,6 +20,14 @@ FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext); + default void onConnected(FlowContext flowContext) { + + } + + default void onDisconnected(FlowContext flowContext) { + + } + default void onDataWriteStart(FlowContext flowContext) { }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Mon May 04 14:47:48 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Tue May 05 15:01:13 2020 +0200 @@ -219,6 +219,8 @@ debug(logger, flowContext, "Error occurred during onConnected calling.", e); } } + + flowContext.flowHandler.onConnected(flowContext); } catch (Exception ex) { error(flowContext, ex); } finally { @@ -244,6 +246,8 @@ debug(logger, flowContext, "Error occurred during onDisconnected calling.", e); } } + + flowContext.flowHandler.onDisconnected(flowContext); } finally { flowContext.clear(); flowContext.signal(); @@ -271,7 +275,7 @@ private void responseReceived0(FlowContext flowContext, Object response) { supervisor.onResponseReceived(flowContext, response); - flowContext.client.onResponseReceived(response, flowContext); + flowContext.flowHandler.onResponseReceived(response, flowContext); flowContext.sentEvent(null); flowContext.receivedStartTimestamp(-1); } @@ -353,7 +357,7 @@ if (flowContext.sentEvent() != null) { flowContext.writeStartTime = System.currentTimeMillis(); flowContext.writeEndTime = -1; - flowContext.client.onDataWriteStart(flowContext); + flowContext.flowHandler.onDataWriteStart(flowContext); } } finally { flowContext.signalAndUnlock(); @@ -374,7 +378,7 @@ } flowContext.writeEndTime = now; - flowContext.client.onDataWriteEnd(flowContext); + flowContext.flowHandler.onDataWriteEnd(flowContext); } } finally { flowContext.signalAndUnlock(); @@ -383,7 +387,7 @@ private void requestSent0(FlowContext flowContext, SessionPayloadEvent event) { flowContext.sentEvent = event; - flowContext.client.onRequestSent(event.getRequest(), flowContext); + flowContext.flowHandler.onRequestSent(event.getRequest(), flowContext); supervisor.onRequestSent(flowContext, event); } @@ -454,7 +458,7 @@ } catch (Exception e) { error(flowContext, e); } finally { - flowContext.unlock(); + flowContext.signalAndUnlock(); } } }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java Mon May 04 14:47:48 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java Tue May 05 15:01:13 2020 +0200 @@ -41,6 +41,15 @@ return true; } + public static boolean lockAndWaitForResponse(FlowContext flowContext) throws InterruptedException { + flowContext.lock(); + try { + return waitForResponse(flowContext, Timeouts.DEFAULT_TIMEOUT); + } finally { + flowContext.signalAndUnlock(); + } + } + public static boolean waitForResponse(FlowContext flowContext) throws InterruptedException { return waitForResponse(flowContext, Timeouts.DEFAULT_TIMEOUT); }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Mon May 04 14:47:48 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Tue May 05 15:01:13 2020 +0200 @@ -567,7 +567,7 @@ if (flowContext.sentEvent() != null) { flowContext.writeStartTime = timeGenerator.currentTimeMillis(); flowContext.writeEndTime = -1; - flowContext.client.onDataWriteStart(flowContext); + flowContext.flowHandler.onDataWriteStart(flowContext); } } finally { flowContext.signalAndUnlock(); @@ -588,7 +588,7 @@ } flowContext.writeEndTime = now; - flowContext.client.onDataWriteEnd(flowContext); + flowContext.flowHandler.onDataWriteEnd(flowContext); } } finally { flowContext.signalAndUnlock();
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Mon May 04 14:47:48 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Tue May 05 15:01:13 2020 +0200 @@ -85,8 +85,8 @@ super.writeMetrics(container); sessions.forEach((s, f) -> { - if (f.client != null) { - f.client.writeMetrics(container); + if (f.flowHandler != null) { + f.flowHandler.writeMetrics(container); } }); @@ -325,7 +325,7 @@ @Override public void onDisconnected(FlowContext flowContext) { if (collectMetrics) { - flowContext.client.writeMetrics(deferredMetrics); + flowContext.flowHandler.writeMetrics(deferredMetrics); } sessions.remove(flowContext.session);
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java Mon May 04 14:47:48 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java Tue May 05 15:01:13 2020 +0200 @@ -13,7 +13,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; + import static com.passus.st.Protocols.NETFLOW; +import static com.passus.st.client.FlowUtils.lockAndWaitForResponse; +import static com.passus.st.client.FlowUtils.waitForResponse; public class MySqlFlowHandler extends AbstractFlowHandler<MySqlMetric, MySqlPacket, MySqlPacket> implements TimeAware { @@ -36,6 +40,17 @@ } @Override + public void onConnected(FlowContext flowContext) { + try { + //Waiting for greeting + waitForResponse(flowContext); + //Thread.sleep(5_000); + } catch (InterruptedException ignore) { + + } + } + + @Override protected void onRequestSent0(MySqlPacket req, FlowContext flowContext) { if (collectMetrics) { if (req.getPacketType() == MySqlPacketTypes.QUERY_COMMAND) {
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java Mon May 04 14:47:48 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java Tue May 05 15:01:13 2020 +0200 @@ -13,14 +13,13 @@ import com.passus.st.client.FlowContext; import com.passus.st.client.credentials.Credentials; import com.passus.st.client.credentials.CredentialsProvider; -import com.passus.st.filter.CredentialsProviderNodeDefinition; import com.passus.st.plugin.PluginConstants; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef; -import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef; +import static com.passus.config.schema.ConfigurationSchemaBuilder.*; import static com.passus.net.mysql.MySqlUtils.passScramble411; +import static com.passus.st.filter.CredentialsProviderNodeDefinition.providersNodeDef; @NodeDefinitionCreate(MySqlLoginFilter.MySqlLoginFilterNodeDefinitionCreator.class) @Plugin(name = MySqlLoginFilter.TYPE, category = PluginConstants.CATEGORY_FLOW_FILTER) @@ -28,10 +27,14 @@ private static final Logger LOGGER = LogManager.getLogger(MySqlLoginFilter.class); + private static final String DEFAULT_AUTH_PLUGIN = "mysql_native_password"; + public static final String TYPE = "mySqlLogin"; protected CredentialsProvider credentialsProvider; + private String defaultAuthPlugin = DEFAULT_AUTH_PLUGIN; + @Override public MySqlLoginFilter instanceForWorker(int index) { MySqlLoginFilter worker = new MySqlLoginFilter(); @@ -42,32 +45,53 @@ @Override public void configure(Configuration config, ConfigurationContext context) { credentialsProvider = (CredentialsProvider) config.get("provider"); + defaultAuthPlugin = config.getString("defaultAuthPlugin", DEFAULT_AUTH_PLUGIN); } @Override public int filterInbound(MySqlPacket req, MySqlPacket resp, FlowContext context) { if (resp.getPacketType() == MySqlPacketTypes.GREETINGS) { - context.setParam("mysql.greetings", resp); + context.setParam("mysql.greeting", resp); } return DUNNO; } + private void modifyLoginReq(MySqlLoginRequest loginReq, MySqlServerGreetingResponse greeting, FlowContext context) { + Credentials credentials = credentialsProvider.getCredentials(context); + if (credentials != null) { + String seed = greeting.getSalt() + greeting.getAuthSalt(); + String authPlugin = greeting.getAuthPlugin() == null ? defaultAuthPlugin : greeting.getAuthPlugin(); + String pass = credentials.getPassword(); + + switch (authPlugin) { + case "mysql_native_password": + byte[] passScramble = passScramble411( + pass.getBytes(), + seed.getBytes() + ); + + loginReq.setPassword(passScramble); + break; + case "mysql_clear_password": + loginReq.setPassword(pass.getBytes()); + break; + default: + LOGGER.warn("Not supported MySql auth plugin '" + authPlugin + "'."); + return; + } + + loginReq.setUsername(credentials.getUser()); + loginReq.setAuthPlugin(authPlugin); + } + } + @Override public int filterOutbound(MySqlPacket req, MySqlPacket resp, FlowContext context) { if (req.getPacketType() == MySqlPacketTypes.LOGIN_REQUEST) { - MySqlServerGreetingResponse greetings = context.getParamValue("mysql.greetings"); - if (greetings != null) { - MySqlLoginRequest loginReq = (MySqlLoginRequest) req; - Credentials credentials = credentialsProvider.getCredentials(context); - if (credentials != null) { - byte[] passScramble = passScramble411( - credentials.getPassword().getBytes(), - greetings.getAuthSalt().getBytes() - ); - loginReq.setUsername(credentials.getUser()); - loginReq.setPassword(passScramble); - } + MySqlServerGreetingResponse greeting = context.getParamValue("mysql.greeting"); + if (greeting != null) { + modifyLoginReq((MySqlLoginRequest) req, greeting, context); } } @@ -79,9 +103,11 @@ @Override public NodeDefinition create() { return mapDef( - tupleDef("provider", CredentialsProviderNodeDefinition.providersNodeDef()).setRequired(true) + tupleDef("provider", providersNodeDef()).setRequired(true), + tupleDef("defaultAuthPlugin", valueDef()).setRequired(false) ); } + } }
--- a/stress-tester/src/main/java/com/passus/st/source/PcapMySqlListener.java Mon May 04 14:47:48 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapMySqlListener.java Tue May 05 15:01:13 2020 +0200 @@ -16,7 +16,7 @@ @Override public void onMessageReceived(SessionContext context, MySqlPacket msg, long timestamp) { //Nie przetwarzamy odpowiedzi - if (!msg.isRequest()) { + if (msg.isRequest()) { firePayloadEvent(msg, null, context, timestamp); } }