Mercurial > stress-tester
changeset 1059:1d47e67bad78
FlowContext - events queue manipulation + PgSql processing enhanced
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Thu Apr 16 09:41:15 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Thu Apr 16 12:52:37 2020 +0200 @@ -6,6 +6,8 @@ import com.passus.st.emitter.ChannelContext; import com.passus.st.emitter.SessionInfo; +import java.util.Collection; +import java.util.Deque; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Condition; @@ -26,6 +28,8 @@ protected final SessionInfo session; + protected final Deque<Event> eventQeueu; + protected volatile boolean blocked; protected volatile boolean sessionEstablishedSeen; @@ -74,7 +78,12 @@ private Map<String, Object> params; public FlowContext(SessionInfo session) { + this(session, null); + } + + public FlowContext(SessionInfo session, Deque<Event> eventQeueu) { this.session = session; + this.eventQeueu = eventQeueu; } public void createLock() { @@ -147,6 +156,10 @@ return timeout; } + public void block() { + blocked = true; + } + public boolean isBlocked() { return blocked; } @@ -177,16 +190,24 @@ this.writeStartTime = sendStartTimestamp; } + public boolean isEventSent() { + return (sentEvent != null); + } + public SessionPayloadEvent sentEvent() { return sentEvent; } - public boolean isEventSent() { - return (sentEvent != null); + public void sentEvent(SessionPayloadEvent sentEvent) { + this.sentEvent = sentEvent; } - public void sentEvent(SessionPayloadEvent sentEvent) { - this.sentEvent = sentEvent; + public Object sentRequest() { + if (sentEvent != null) { + return sentEvent.getRequest(); + } + + return null; } @Deprecated @@ -226,6 +247,50 @@ return session; } + public void queueAddFirst(Event[] events) { + if (eventQeueu != null) { + for (Event event : events) { + eventQeueu.addFirst(event); + } + } + } + + public void queueAddFirst(Collection<Event> events) { + if (eventQeueu != null) { + for (Event event : events) { + eventQeueu.addFirst(event); + } + } + } + + public void queueAddFirst(Event event) { + if (eventQeueu != null) { + eventQeueu.addFirst(event); + } + } + + public void queueAddLast(Event[] events) { + if (eventQeueu != null) { + for (Event event : events) { + eventQeueu.addLast(event); + } + } + } + + public void queueAddLast(Collection<Event> events) { + if (eventQeueu != null) { + for (Event event : events) { + eventQeueu.addLast(event); + } + } + } + + public void queueAddLast(Event event) { + if (eventQeueu != null) { + eventQeueu.addLast(event); + } + } + public Object getParam(String name) { if (params == null) { return null;
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Thu Apr 16 09:41:15 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Thu Apr 16 12:52:37 2020 +0200 @@ -402,6 +402,9 @@ buffer = flowContext.buffer(); try { encoder.encode(req, flowContext, buffer); + if (trace) { + logger.trace("Request encoded: " + req); + } } catch (Exception e) { flowContext.encoderErrors++; if (logger.isDebugEnabled()) {
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Thu Apr 16 09:41:15 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Thu Apr 16 12:52:37 2020 +0200 @@ -148,7 +148,7 @@ } protected FlowContext createFlowContext(SessionInfo session) { - FlowContext flowContext = new FlowContext(session); + FlowContext flowContext = new FlowContext(session, null); flowContext.createLock(); return flowContext; }
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Thu Apr 16 09:41:15 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Thu Apr 16 12:52:37 2020 +0200 @@ -112,7 +112,7 @@ return null; } - FlowContext flowContext = new FlowContext(session); + FlowContext flowContext = new FlowContext(session, eventsQueue); flowContext.createLock(); FlowHandler client = clientFactory.create(session.getProtocolId()); client.init(flowContext);
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Thu Apr 16 09:41:15 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Thu Apr 16 12:52:37 2020 +0200 @@ -44,8 +44,6 @@ public SynchFlowWorker(Emitter emitter, String name, int index) { super(emitter, name, index); - SynchWrapper supervisor = new SynchWrapper(this); - flowProcessor = new FlowProcessor(supervisor, logger, index); } @Override @@ -89,7 +87,7 @@ return null; } - FlowContext flowContext = new FlowContext(session); + FlowContext flowContext = new FlowContext(session, eventsQueue); flowContext.createLock(); FlowHandler client = clientFactory.create(session.getProtocolId()); @@ -263,6 +261,9 @@ @Override public void run() { working = true; + SynchWrapper supervisor = new SynchWrapper(this); + flowProcessor = new FlowProcessor(supervisor, logger, index); + while (working) { Event event = null; try {
--- a/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpFlowUtils.java Thu Apr 16 09:41:15 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpFlowUtils.java Thu Apr 16 12:52:37 2020 +0200 @@ -20,7 +20,7 @@ } public static FlowContext createFlowContext(SessionInfo session, HttpScopes scopes) { - FlowContext context = new FlowContext(session); + FlowContext context = new FlowContext(session, null); context.setParam(HttpFlowConst.PARAM_HTTP_CONTEXT, new HttpFlowContext(context, scopes)); return context; }
--- a/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java Thu Apr 16 09:41:15 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/PgSqlFlowHandlerNetflowFlowHandler.java Thu Apr 16 12:52:37 2020 +0200 @@ -3,16 +3,23 @@ import com.passus.commons.Assert; import com.passus.commons.time.TimeAware; import com.passus.commons.time.TimeGenerator; +import com.passus.net.pgsql.PgSqlErrorResponseMessage; import com.passus.net.pgsql.PgSqlMessage; import com.passus.net.pgsql.PgSqlMessageType; import com.passus.net.pgsql.PgSqlSimpleQueryMessage; import com.passus.st.client.AbstractFlowHandler; import com.passus.st.client.FlowContext; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; import static com.passus.st.Protocols.NETFLOW; public class PgSqlFlowHandlerNetflowFlowHandler extends AbstractFlowHandler<PgSqlMetric, PgSqlMessage, PgSqlMessage> implements TimeAware { + private final Logger LOGGER = LogManager.getLogger(PgSqlFlowHandlerNetflowFlowHandler.class); + TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); public PgSqlFlowHandlerNetflowFlowHandler() { @@ -30,10 +37,10 @@ } @Override - protected void onRequestSent0(PgSqlMessage msg, FlowContext flowContext) { + protected void onRequestSent0(PgSqlMessage req, FlowContext flowContext) { if (collectMetrics) { - if (msg.getType() == PgSqlMessageType.SIMPLE_QUERY) { - PgSqlSimpleQueryMessage simpleQueryMsg = (PgSqlSimpleQueryMessage) msg; + if (req.getType() == PgSqlMessageType.SIMPLE_QUERY) { + PgSqlSimpleQueryMessage simpleQueryMsg = (PgSqlSimpleQueryMessage) req; synchronized (metric) { metric.addQuery(simpleQueryMsg.getQuery()); } @@ -41,9 +48,36 @@ } } + private void disconnectAndBlock(FlowContext flowContext) { + try { + flowContext.block(); + flowContext.channelContext().close(); + } catch (IOException ignore) { + + } finally { + + } + } + @Override - protected void onResponseReceived0(PgSqlMessage msg, FlowContext flowContext) { - //flowContext.channelContext().close(); + protected void onResponseReceived0(PgSqlMessage resp, FlowContext flowContext) { + if (resp.getType() == PgSqlMessageType.ERROR_RESPONSE) { + PgSqlErrorResponseMessage errorMsg = (PgSqlErrorResponseMessage) resp; + PgSqlMessage req = (PgSqlMessage) flowContext.sentRequest(); + if (req.getType() == PgSqlMessageType.STARTUP_MESSAGE) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("PgSql auth failed. Server message " + errorMsg); + } + + disconnectAndBlock(flowContext); + } else if(errorMsg.isConnectionExceptionClass()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Fatal error. Server message " + errorMsg); + } + + disconnectAndBlock(flowContext); + } + } } @Override
--- a/stress-tester/src/main/java/com/passus/st/client/pgsql/filter/PgSqlFilter.java Thu Apr 16 09:41:15 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/filter/PgSqlFilter.java Thu Apr 16 12:52:37 2020 +0200 @@ -5,7 +5,6 @@ import com.passus.st.filter.FlowFilter; /** - * * @author mikolaj.podbielski */ public abstract class PgSqlFilter implements FlowFilter { @@ -24,13 +23,13 @@ @Override public int filterOutbound(Object req, Object resp, FlowContext context) { - if (req instanceof PgSqlMessage) { - return filterOutbound((PgSqlMessage) req, resp, context); + if (req instanceof PgSqlMessage || resp instanceof PgSqlMessage) { + return filterOutbound((PgSqlMessage) req, (PgSqlMessage) resp, context); } return DUNNO; } - public int filterOutbound(PgSqlMessage req, Object resp, FlowContext context) { + public int filterOutbound(PgSqlMessage req, PgSqlMessage resp, FlowContext context) { return DUNNO; } }
--- a/stress-tester/src/main/java/com/passus/st/client/pgsql/filter/PgSqlLoginFilter.java Thu Apr 16 09:41:15 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/pgsql/filter/PgSqlLoginFilter.java Thu Apr 16 12:52:37 2020 +0200 @@ -4,29 +4,25 @@ import com.passus.config.Configuration; import com.passus.config.ConfigurationContext; import com.passus.config.annotations.NodeDefinitionCreate; -import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef; -import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef; -import com.passus.config.schema.MapNodeDefinition; import com.passus.config.schema.NodeDefinition; import com.passus.config.schema.NodeDefinitionCreator; -import com.passus.net.pgsql.PgSqlAuthRequestMessage; -import com.passus.net.pgsql.PgSqlAuthRequestMessageMD5; -import com.passus.net.pgsql.PgSqlMessage; -import com.passus.net.pgsql.PgSqlPasswordMessage; +import com.passus.net.pgsql.*; import com.passus.st.client.FlowContext; import com.passus.st.client.credentials.Credentials; import com.passus.st.client.credentials.CredentialsProvider; import com.passus.st.filter.CredentialsProviderNodeDefinition; import com.passus.st.plugin.PluginConstants; -import static com.passus.st.utils.HashUtils.HEX_L; -import static com.passus.st.utils.HashUtils.getMD5; -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; + +import static com.passus.config.schema.ConfigurationSchemaBuilder.*; +import static com.passus.st.utils.HashUtils.HEX_L; +import static com.passus.st.utils.HashUtils.getMD5; + /** - * * @author mikolaj.podbielski */ @NodeDefinitionCreate(PgSqlLoginFilter.PgSqlLoginFilterNodeDefinitionCreator.class) @@ -39,21 +35,25 @@ protected CredentialsProvider credentialsProvider; + protected String dbName; + @Override public PgSqlLoginFilter instanceForWorker(int index) { PgSqlLoginFilter worker = new PgSqlLoginFilter(); worker.credentialsProvider = credentialsProvider; + worker.dbName = dbName; return worker; } @Override public void configure(Configuration config, ConfigurationContext context) { - credentialsProvider = config.get("provider", null); + credentialsProvider = (CredentialsProvider) config.get("provider"); + dbName = (String) config.get("dbName"); } @Override public int filterInbound(Object req, PgSqlMessage resp, FlowContext context) { - if (resp instanceof PgSqlAuthRequestMessage) { + if (resp.getType() == PgSqlMessageType.AUTH_REQUEST) { PgSqlAuthRequestMessage authReq = (PgSqlAuthRequestMessage) resp; context.setParam("authReq", authReq); } @@ -61,30 +61,40 @@ } @Override - public int filterOutbound(PgSqlMessage req, Object resp, FlowContext context) { - if (req instanceof PgSqlPasswordMessage) { + public int filterOutbound(PgSqlMessage req, PgSqlMessage resp, FlowContext context) { + if (req.getType() == PgSqlMessageType.STARTUP_MESSAGE) { + PgSqlStartupMessage startupMsg = (PgSqlStartupMessage) req; + Credentials credentials = credentialsProvider.getCredentials(context); + if (dbName != null) { + startupMsg.setParameter(PgSqlStartupMessage.PARAM_DATABASE, dbName); + } + + startupMsg.setParameter(PgSqlStartupMessage.PARAM_USER, credentials.getUser()); + context.setParam("authCredentials", credentials); + } else if (req.getType() == PgSqlMessageType.PASSWORD_MESSAGE) { PgSqlPasswordMessage passwordReq = (PgSqlPasswordMessage) req; PgSqlAuthRequestMessage authReq = context.getParamValue("authReq"); - PgSqlAuthRequestMessage.AuthType authType = authReq.getAuthType(); - - Credentials credentials = credentialsProvider.getCredentials(context); - String password = credentials.getPassword(); - String user = credentials.getUser(); + Credentials credentials = context.getParamValue("authCredentials"); + if (authReq != null && credentials != null) { + PgSqlAuthRequestMessage.AuthType authType = authReq.getAuthType(); + String password = credentials.getPassword(); + String user = credentials.getUser(); - switch (authType) { - case PLAINTEXT: - passwordReq.setPassword(password); - break; - case MD5: - PgSqlAuthRequestMessageMD5 authReqMD5 = (PgSqlAuthRequestMessageMD5) authReq; - byte[] salt = authReqMD5.getSalt(); - password = hashMD5(user, password, salt); - passwordReq.setPassword(password); - break; - default: - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Unsupported auth type: " + authType); - } + switch (authType) { + case PLAINTEXT: + passwordReq.setPassword(password); + break; + case MD5: + PgSqlAuthRequestMessageMD5 authReqMD5 = (PgSqlAuthRequestMessageMD5) authReq; + byte[] salt = authReqMD5.getSalt(); + password = hashMD5(user, password, salt); + passwordReq.setPassword(password); + break; + default: + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Unsupported auth type: " + authType); + } + } } } return DUNNO; @@ -137,9 +147,10 @@ @Override public NodeDefinition create() { - MapNodeDefinition mapDef = mapDef(); - mapDef.add(tupleDef("provider", CredentialsProviderNodeDefinition.providersNodeDef()).setRequired(true)); - return mapDef; + return mapDef( + tupleDef("provider", CredentialsProviderNodeDefinition.providersNodeDef()).setRequired(true), + tupleDef("dbName", valueDef()).setRequired(false) + ); } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/StatelessTasks.java Thu Apr 16 09:41:15 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/StatelessTasks.java Thu Apr 16 12:52:37 2020 +0200 @@ -7,6 +7,7 @@ public static final Task READ_TASK = new Task(Task.READ); public static final Task FLUSH_TASK = new Task(Task.FLUSH); public static final Task CLOSE_TASK = new Task(Task.CLOSE); + public static final Task CLOSE_AND_BLOCK_TASK = new Task(Task.CLOSE_AND_BLOCK_TASK); private StatelessTasks() { }
--- a/stress-tester/src/main/java/com/passus/st/emitter/Task.java Thu Apr 16 09:41:15 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/Task.java Thu Apr 16 12:52:37 2020 +0200 @@ -7,6 +7,7 @@ public static final int READ = 3; public static final int FLUSH = 4; public static final int CLOSE = 5; + public static final int CLOSE_AND_BLOCK_TASK = 6; public final int code;
--- a/stress-tester/src/main/java/com/passus/st/job/TestJob.java Thu Apr 16 09:41:15 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/job/TestJob.java Thu Apr 16 12:52:37 2020 +0200 @@ -204,7 +204,7 @@ } // TODO: czy dodać filtry top-level do klienta??? - flowExecutor.addFilter(httpVarsFilter); + filters.forEach(flowExecutor::addFilter); } }