Mercurial > stress-tester
changeset 1085:351208f87e5d
OperationEvent, FlowContext.eventQueue
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Wed May 06 08:42:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Wed May 06 15:34:24 2020 +0200 @@ -6,10 +6,7 @@ import com.passus.st.emitter.ChannelContext; import com.passus.st.emitter.SessionInfo; -import java.util.Collection; -import java.util.Deque; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -40,6 +37,8 @@ protected SessionPayloadEvent sentEvent; + protected Object receivedResponse; + protected byte state = STATE_NEW; protected long connectionTime = -1; @@ -78,7 +77,7 @@ private Map<String, Object> params; public FlowContext(SessionInfo session) { - this(session, null); + this(session, new LinkedList<>()); } public FlowContext(SessionInfo session, Deque<Event> eventQeueu) { @@ -210,6 +209,14 @@ return null; } + public <T> T getReceivedResponse() { + return (T) receivedResponse; + } + + public void setReceivedResponse(Object receivedResponse) { + this.receivedResponse = receivedResponse; + } + @Deprecated public DataDecoder decoder() { return decoder; @@ -248,47 +255,35 @@ } public void queueAddFirst(Event[] events) { - if (eventQeueu != null) { - for (Event event : events) { - eventQeueu.addFirst(event); - } + for (Event event : events) { + eventQeueu.addFirst(event); } } public void queueAddFirst(Collection<Event> events) { - if (eventQeueu != null) { - for (Event event : events) { - eventQeueu.addFirst(event); - } + for (Event event : events) { + eventQeueu.addFirst(event); } } public void queueAddFirst(Event event) { - if (eventQeueu != null) { - eventQeueu.addFirst(event); - } + eventQeueu.addFirst(event); } public void queueAddLast(Event[] events) { - if (eventQeueu != null) { - for (Event event : events) { - eventQeueu.addLast(event); - } + for (Event event : events) { + eventQeueu.addLast(event); } } public void queueAddLast(Collection<Event> events) { - if (eventQeueu != null) { - for (Event event : events) { - eventQeueu.addLast(event); - } + for (Event event : events) { + eventQeueu.addLast(event); } } public void queueAddLast(Event event) { - if (eventQeueu != null) { - eventQeueu.addLast(event); - } + eventQeueu.addLast(event); } public Object getParam(String name) {
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Wed May 06 08:42:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Wed May 06 15:34:24 2020 +0200 @@ -250,8 +250,7 @@ flowContext.flowHandler.onDisconnected(flowContext); } finally { flowContext.clear(); - flowContext.signal(); - flowContext.unlock(); + flowContext.signalAndUnlock(); } } @@ -273,82 +272,6 @@ } } - private void responseReceived0(FlowContext flowContext, Object response) { - supervisor.onResponseReceived(flowContext, response); - flowContext.flowHandler.onResponseReceived(response, flowContext); - flowContext.sentEvent(null); - flowContext.receivedStartTimestamp(-1); - } - - @Override - public void dataReceived(ChannelContext context, ByteBuff data) throws Exception { - FlowContext flowContext = context.getFlowContext(); - flowContext.lock(); - try { - try { - FlowHandler client = flowContext.client(); - FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext); - decoder.decode(data, flowContext); - - long now = System.currentTimeMillis(); - if (flowContext.receivedStartTimestamp() == -1) { - flowContext.receivedStartTimestamp(now); - } - - if (decoder.state() == DataDecoder.STATE_ERROR) { - if (collectMetric) { - synchronized (metric) { - metric.incErrorNum(); - } - } - - if (logger.isDebugEnabled()) { - debug(logger, flowContext, "Decoder error. " + decoder.getLastError()); - } - - decoder.clear(flowContext); - responseReceived0(flowContext, null); - } else if (decoder.state() == DataDecoder.STATE_FINISHED) { - if (collectMetric) { - synchronized (metric) { - metric.incResponsesNum(); - metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp()); - } - } - - Object resp = decoder.getResult(); - if (logger.isDebugEnabled()) { - debug(logger, flowContext, "Response decoded. Response class " + resp.getClass().getSimpleName() + "."); - } - - Object req = null; - if (flowContext.sentEvent() != null) { - req = flowContext.sentEvent().getRequest(); - } - - decoder.clear(flowContext); - if (filterChain.filterInbound(req, resp, flowContext) != Filter.DENY) { - responseReceived0(flowContext, resp); - } - } - } catch (Exception e) { - if (collectMetric) { - synchronized (metric) { - metric.incErrorNum(); - } - } - - if (logger.isDebugEnabled()) { - debug(logger, flowContext, e.getMessage(), e); - } - - error(flowContext, FlowError.unknownError()); - } - } finally { - flowContext.signalAndUnlock(); - } - } - @Override public void dataWriteStart(ChannelContext context) { FlowContext flowContext = context.getFlowContext(); @@ -430,6 +353,7 @@ } try { + flowContext.receivedResponse = null; flowContext.sentEvent = event; flowContext.writeStartTime = -1; flowContext.writeEndTime = -1; @@ -461,4 +385,82 @@ flowContext.signalAndUnlock(); } } + + private void responseReceived0(FlowContext flowContext, Object response, boolean blocked) { + flowContext.receivedResponse = response; + supervisor.onResponseReceived(flowContext, response); + if (!blocked) { + flowContext.flowHandler.onResponseReceived(response, flowContext); + } + flowContext.sentEvent = null; + flowContext.receivedStartTime = -1; + } + + @Override + public void dataReceived(ChannelContext context, ByteBuff data) throws Exception { + FlowContext flowContext = context.getFlowContext(); + flowContext.lock(); + try { + try { + FlowHandler client = flowContext.client(); + FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext); + decoder.decode(data, flowContext); + + long now = System.currentTimeMillis(); + if (flowContext.receivedStartTimestamp() == -1) { + flowContext.receivedStartTimestamp(now); + } + + if (decoder.state() == DataDecoder.STATE_ERROR) { + if (collectMetric) { + synchronized (metric) { + metric.incErrorNum(); + } + } + + if (logger.isDebugEnabled()) { + debug(logger, flowContext, "Decoder error. " + decoder.getLastError()); + } + + decoder.clear(flowContext); + responseReceived0(flowContext, null, false); + } else if (decoder.state() == DataDecoder.STATE_FINISHED) { + if (collectMetric) { + synchronized (metric) { + metric.incResponsesNum(); + metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp()); + } + } + + Object resp = decoder.getResult(); + if (logger.isDebugEnabled()) { + debug(logger, flowContext, "Response decoded. Response class " + resp.getClass().getSimpleName() + "."); + } + + Object req = null; + if (flowContext.sentEvent() != null) { + req = flowContext.sentEvent().getRequest(); + } + + decoder.clear(flowContext); + boolean blocked = filterChain.filterInbound(req, resp, flowContext) != Filter.DENY; + responseReceived0(flowContext, resp, blocked); + } + } catch (Exception e) { + if (collectMetric) { + synchronized (metric) { + metric.incErrorNum(); + } + } + + if (logger.isDebugEnabled()) { + debug(logger, flowContext, e.getMessage(), e); + } + + error(flowContext, FlowError.unknownError()); + } + } finally { + flowContext.signalAndUnlock(); + } + } }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java Wed May 06 08:42:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java Wed May 06 15:34:24 2020 +0200 @@ -56,7 +56,7 @@ public static boolean waitForResponse(FlowContext flowContext, long timeout) throws InterruptedException { long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeout); - while (flowContext.sentEvent != null && !flowContext.isError()) { + while (flowContext.receivedResponse == null && !flowContext.isError()) { if (timeNanos <= 0) { return false; }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Wed May 06 08:42:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Wed May 06 15:34:24 2020 +0200 @@ -148,7 +148,7 @@ } protected FlowContext createFlowContext(SessionInfo session) { - FlowContext flowContext = new FlowContext(session, null); + FlowContext flowContext = new FlowContext(session); flowContext.createLock(); return flowContext; }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/OperationEvent.java Wed May 06 15:34:24 2020 +0200 @@ -0,0 +1,37 @@ +package com.passus.st.client; + +import java.io.IOException; + +public abstract class OperationEvent implements Event { + + public static final int TYPE = 13; + + private long timestamp; + + public OperationEvent() { + timestamp = System.currentTimeMillis(); + } + + @Override + public int getType() { + return TYPE; + } + + @Override + public String getSourceName() { + return null; + } + + @Override + public long getTimestamp() { + return timestamp; + } + + @Override + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public abstract void execute() throws IOException; + +}
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Wed May 06 08:42:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Wed May 06 15:34:24 2020 +0200 @@ -112,7 +112,7 @@ return null; } - FlowContext flowContext = new FlowContext(session, eventsQueue); + FlowContext flowContext = new FlowContext(session); flowContext.createLock(); FlowHandler client = clientFactory.create(session.getProtocolId()); client.init(flowContext);
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Wed May 06 08:42:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Wed May 06 15:34:24 2020 +0200 @@ -10,6 +10,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; +import java.util.LinkedList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; @@ -110,7 +112,7 @@ return null; } - FlowContext flowContext = new FlowContext(session, eventsQueue); + FlowContext flowContext = new FlowContext(session, new LinkedList<>()); flowContext.createLock(); FlowHandler client = clientFactory.create(session.getProtocolId()); client.init(flowContext); @@ -202,6 +204,7 @@ if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { flowContext = registerAndConnect(statusEvent.getSessionInfo(), wait); flowContext.sessionEstablishedSeen = true; + processFlowEvents(flowContext); } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { flowContext = flowContext(statusEvent); if (flowContext != null) { @@ -213,6 +216,21 @@ } } + protected void processFlowEvents(FlowContext flowContext) { + while (!flowContext.eventQeueu.isEmpty()) { + Event event = flowContext.eventQeueu.removeFirst(); + if (event.getType() == SessionPayloadEvent.TYPE) { + flowProcessor.send(flowContext, (SessionPayloadEvent) event, true); + } else if (event.getType() == OperationEvent.TYPE) { + try { + ((OperationEvent) event).execute(); + } catch (IOException e) { + errorInternal(flowContext, e); + } + } + } + } + protected void sleep(Event event) { if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) { if (lastEventTimestamp != -1) { @@ -243,6 +261,7 @@ if (event.getType() == SessionStatusEvent.TYPE) { SessionStatusEvent statusEvent = (SessionStatusEvent) sessEvent; processFlowSessionStatusEvent(statusEvent, true); + return; } else if (event.getType() == SessionPayloadEvent.TYPE) { FlowContext flowContext = flowContext(sessEvent); @@ -261,7 +280,8 @@ } if (flowContext != null && flowContext.state == STATE_CONNECTED) { - flowProcessor.send(flowContext, (SessionPayloadEvent) event, true); + flowContext.queueAddLast(event); + processFlowEvents(flowContext); } } } else if (event.getType() == DataEvents.DataLoopEnd.TYPE) { @@ -333,7 +353,7 @@ @Override public void onResponseReceived(FlowContext flowContext, Object response) { - fireResponseReceived(flowContext.sentEvent.getRequest(), response, flowContext); + fireResponseReceived(flowContext.sentRequest(), response, flowContext); } } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/WaitForResponseEvent.java Wed May 06 15:34:24 2020 +0200 @@ -0,0 +1,42 @@ +package com.passus.st.client; + +import java.io.IOException; + +import static com.passus.st.client.FlowUtils.waitForResponse; + +public final class WaitForResponseEvent extends OperationEvent { + + public static int TYPE = 13; + + private final FlowContext flowContext; + + private final boolean closeOnTimeout; + + public WaitForResponseEvent(FlowContext flowContext) { + this(flowContext, true); + } + + public WaitForResponseEvent(FlowContext flowContext, boolean closeOnTimeout) { + this.flowContext = flowContext; + this.closeOnTimeout = closeOnTimeout; + } + + @Override + public void execute() throws IOException { + flowContext.lock(); + try { + boolean ok = true; + try { + ok = waitForResponse(flowContext); + } catch (InterruptedException ignore) { + + } + + if (!ok && closeOnTimeout) { + flowContext.channelContext().close(); + } + } finally { + flowContext.signalAndUnlock(); + } + } +}
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java Wed May 06 08:42:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/MySqlFlowHandler.java Wed May 06 15:34:24 2020 +0200 @@ -9,15 +9,12 @@ import com.passus.net.mysql.MySqlQueryCommand; import com.passus.st.client.AbstractFlowHandler; import com.passus.st.client.FlowContext; +import com.passus.st.client.WaitForResponseEvent; import com.passus.st.client.pgsql.PgSqlFlowHandler; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.IOException; - import static com.passus.st.Protocols.NETFLOW; -import static com.passus.st.client.FlowUtils.lockAndWaitForResponse; -import static com.passus.st.client.FlowUtils.waitForResponse; public class MySqlFlowHandler extends AbstractFlowHandler<MySqlMetric, MySqlPacket, MySqlPacket> implements TimeAware { @@ -41,13 +38,7 @@ @Override public void onConnected(FlowContext flowContext) { - try { - //Waiting for greeting - waitForResponse(flowContext); - //Thread.sleep(5_000); - } catch (InterruptedException ignore) { - - } + flowContext.queueAddFirst(new WaitForResponseEvent(flowContext)); } @Override
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlFilter.java Wed May 06 08:42:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlFilter.java Wed May 06 15:34:24 2020 +0200 @@ -8,7 +8,7 @@ @Override public int filterInbound(Object req, Object resp, FlowContext context) { - if (resp instanceof MySqlPacket) { + if (req instanceof MySqlPacket || resp instanceof MySqlPacket) { return filterInbound((MySqlPacket) req, (MySqlPacket) resp, context); } return DUNNO;
--- a/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java Wed May 06 08:42:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/mysql/filter/MySqlLoginFilter.java Wed May 06 15:34:24 2020 +0200 @@ -39,6 +39,7 @@ public MySqlLoginFilter instanceForWorker(int index) { MySqlLoginFilter worker = new MySqlLoginFilter(); worker.credentialsProvider = credentialsProvider; + worker.defaultAuthPlugin = defaultAuthPlugin; return worker; } @@ -83,6 +84,7 @@ loginReq.setUsername(credentials.getUser()); loginReq.setAuthPlugin(authPlugin); + loginReq.setAttributes(null); } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java Wed May 06 08:42:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java Wed May 06 15:34:24 2020 +0200 @@ -144,8 +144,8 @@ logger.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress()); } + setOpRead(key); keyContext.handler.channelActive(keyContext.channelContext); - setOpRead(key); } catch (Exception ex) { logger.error(ex.getMessage(), ex); }
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Wed May 06 08:42:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Wed May 06 15:34:24 2020 +0200 @@ -106,9 +106,8 @@ return; } - if (logger.isDebugEnabled()) { - logger.debug("Readed {}B ({} -> {})", buffer.readableBytes(), + logger.debug("Read {}B ({} -> {})", totalRead, channelContext.getLocalAddress(), channelContext.getRemoteAddress()); }
--- a/stress-tester/src/test/java/com/passus/st/client/EventTest.java Wed May 06 08:42:28 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/EventTest.java Wed May 06 15:34:24 2020 +0200 @@ -2,24 +2,22 @@ import com.passus.commons.Loader; import com.passus.commons.Resolver; -import java.lang.reflect.Constructor; +import org.testng.Assert; +import org.testng.annotations.Test; + import java.lang.reflect.Field; import java.lang.reflect.Modifier; -import java.lang.reflect.Parameter; import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; -import org.testng.Assert; -import org.testng.annotations.Test; /** - * * @author mikolaj.podbielski */ public class EventTest { @Test - public void testGetTypeIsUnique() throws ReflectiveOperationException { + public void testGetTypeIsUnique() throws Exception { Set<Class<?>> classes = find(); Map<Class, Integer> types = getEventTypes(classes); Map<Integer, Class> reverseMap = new LinkedHashMap<>(); @@ -30,7 +28,6 @@ + e.getKey().getName() + " " + oldKey.getName()); } } - } public static void main(String[] args) throws Exception { @@ -41,14 +38,17 @@ } } - private static Map<Class, Integer> getEventTypes(Set<Class<?>> classes) throws ReflectiveOperationException { + private static Map<Class, Integer> getEventTypes(Set<Class<?>> classes) throws Exception { Map<Class, Integer> types = new LinkedHashMap<>(classes.size()); for (final Class<?> clazz : classes) { - Field field = clazz.getDeclaredField("TYPE"); - if (field.isAccessible()) { - types.put(clazz, (Integer) field.get(null)); + try { + Field field = clazz.getDeclaredField("TYPE"); + if (field.isAccessible()) { + types.put(clazz, (Integer) field.get(null)); + } + } catch (Exception e) { + throw new Exception("Unable to process " + clazz.getName() + ". " + e.getMessage(), e); } - } return types; }