Mercurial > stress-tester
changeset 1035:46067bb4f3ce
ParallelFlowWorker in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java Mon Apr 06 14:13:39 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java Tue Apr 07 13:34:22 2020 +0200 @@ -272,7 +272,7 @@ } for (FlowWorker worker : workers) { - worker.disconnect(); + worker.disconnectAll(); worker.interrupt(); try { @@ -285,9 +285,9 @@ started = false; } - public void closeAllConnections() { + public void disconnectAll() { for (FlowWorker worker : workers) { - worker.disconnect(); + worker.disconnectAll(); } }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactory.java Mon Apr 06 14:13:39 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactory.java Tue Apr 07 13:34:22 2020 +0200 @@ -4,4 +4,6 @@ FlowHandler create(int protocolId); + FlowHandler create(Object message); + }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java Mon Apr 06 14:13:39 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java Tue Apr 07 13:34:22 2020 +0200 @@ -1,5 +1,10 @@ package com.passus.st.client; +import com.passus.net.dns.DnsRecord; +import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpResponse; +import com.passus.net.netflow.Netflow; +import com.passus.net.pgsql.PgSqlMessage; import com.passus.st.client.dns.DnsFlowHandler; import com.passus.st.client.http.HttpFlowHandler; import com.passus.st.client.netflow.NetflowFlowHandler; @@ -25,4 +30,23 @@ throw new IllegalArgumentException("Not supported protocol '" + protocolId + "'."); } + + @Override + public FlowHandler create(Object message) { + if (message == null) { + throw new NullPointerException(); + } + + if (message instanceof HttpRequest || message instanceof HttpResponse) { + return new HttpFlowHandler(); + } else if (message instanceof DnsRecord) { + return new DnsFlowHandler(); + } else if (message instanceof Netflow) { + return new NetflowFlowHandler(); + } else if (message instanceof PgSqlMessage) { + return new PgSqlFlowHandlerNetflowFlowHandler(); + } + + throw new IllegalArgumentException("Not supported class '" + message.getClass() + "'."); + } }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Mon Apr 06 14:13:39 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Tue Apr 07 13:34:22 2020 +0200 @@ -314,6 +314,10 @@ } Object resp = decoder.getResult(); + if (logger.isDebugEnabled()) { + debug(logger, flowContext, "Response decoded. Response class " + resp.getClass().getSimpleName() + "."); + } + Object req = null; if (flowContext.sentEvent() != null) { req = flowContext.sentEvent().getRequest();
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessorSupervisor.java Mon Apr 06 14:13:39 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessorSupervisor.java Tue Apr 07 13:34:22 2020 +0200 @@ -31,16 +31,28 @@ FlowMetric getFlowMetric(); - void onConnected(FlowContext flowContext); - - void onRequestSent(FlowContext flowContext, SessionPayloadEvent event); + default void onConnected(FlowContext flowContext) { - void onResponseReceived(FlowContext flowContext, Object response); + } - void onDisconnecting(FlowContext flowContext); + default void onRequestSent(FlowContext flowContext, SessionPayloadEvent event) { - void onDisconnected(FlowContext flowContext); + } - void onError(FlowContext flowContext); + default void onResponseReceived(FlowContext flowContext, Object response) { + + } + + default void onDisconnecting(FlowContext flowContext) { + + } + + default void onDisconnected(FlowContext flowContext) { + + } + + default void onError(FlowContext flowContext) { + + } }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Mon Apr 06 14:13:39 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Tue Apr 07 13:34:22 2020 +0200 @@ -186,9 +186,17 @@ public abstract int activeConnections(); - public abstract void disconnect(); + public void disconnectAll() { + disconnectAll(true); + } - public abstract void disconnect(SessionInfo session); + public abstract void disconnectAll(boolean wait); + + public void disconnect(SessionInfo session) { + disconnect(session, true); + } + + public abstract void disconnect(SessionInfo session, boolean wait); public abstract void handle(Event event);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Mon Apr 06 14:13:39 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Tue Apr 07 13:34:22 2020 +0200 @@ -222,7 +222,12 @@ } @Override - public void disconnect() { + public void disconnect(SessionInfo session, boolean wait) { + + } + + @Override + public void disconnectAll(boolean wait) { for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) { FlowContext flowContext = entry.getValue(); try {
--- a/stress-tester/src/main/java/com/passus/st/client/NullFlowWorker.java Mon Apr 06 14:13:39 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/NullFlowWorker.java Tue Apr 07 13:34:22 2020 +0200 @@ -20,12 +20,12 @@ } @Override - public void disconnect() { + public void disconnectAll(boolean wait) { } @Override - public void disconnect(SessionInfo session) { + public void disconnect(SessionInfo session, boolean wait) { }
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Mon Apr 06 14:13:39 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Tue Apr 07 13:34:22 2020 +0200 @@ -5,38 +5,39 @@ import com.passus.st.emitter.ChannelContext; import com.passus.st.emitter.Emitter; import com.passus.st.emitter.SessionInfo; +import com.passus.st.filter.FlowFilterChain; import com.passus.st.plugin.PluginConstants; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.LinkedList; -import java.util.Queue; +import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.Semaphore; -import static com.passus.st.client.FlowContext.*; -import static com.passus.st.client.FlowUtils.waitOpFinished; +import static com.passus.st.client.FlowContext.STATE_CONNECTED; +import static com.passus.st.client.FlowUtils.*; @Plugin(name = ParallelFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER) -public class ParallelFlowWorker { +public class ParallelFlowWorker extends FlowWorker { - public static final String TYPE = "parallel"; + public static final String TYPE = "parallel"; - /*public static final int DEFAULT_MAX_SENT_REQUESTS = 10; + public static final int DEFAULT_MAX_SENT_REQUESTS = 10; + public static final int DEFAULT_FLOW_EVENTS_QUEUE = 100; private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>(); + protected final Map<SessionInfo, FlowThread> sessions = new ConcurrentHashMap<>(); + private int maxSentRequests = DEFAULT_MAX_SENT_REQUESTS; private long eventsQueueWaitTime = 100; - private final Semaphore semaphore = new Semaphore(maxSentRequests); + private int flowEventsQueue = DEFAULT_FLOW_EVENTS_QUEUE; - private final Deque<LocalFlowContext> flowIndex = new ArrayDeque<>(); - - private boolean closeAllConnections = false; + private volatile boolean working; public ParallelFlowWorker(Emitter emitter, String name, int index) { super(emitter, name, index); @@ -51,20 +52,6 @@ this.maxSentRequests = maxSentRequests; } - @Override - protected LocalFlowContext flowContext(SessionInfo session) { - return (LocalFlowContext) super.flowContext(session); - } - - @Override - protected LocalFlowContext flowContext(ChannelContext context) { - return flowContext(context.getSessionInfo()); - } - - @Override - protected LocalFlowContext flowContext(SessionEvent event) { - return flowContext(event.getSessionInfo()); - } public long getEventsQueueWaitTime() { return eventsQueueWaitTime; @@ -75,180 +62,141 @@ this.eventsQueueWaitTime = eventsQueueWaitTime; } - protected void removeFlowContext(FlowContext flowContext) { - if (flowContext != null) { - flowIndex.remove(flowContext); - } + @Override + public boolean isWorking() { + return working; } @Override - protected LocalFlowContext createFlowContext(SessionInfo session) { - LocalFlowContext flowContext = new LocalFlowContext(session); - flowIndex.add(flowContext); - return flowContext; + public int activeConnections() { + return 0; } - private void waitCloseAllConnections() { - *//*closeAllConnections = true; - synchronized (lock) { - while (!flowIndex.isEmpty()) { - try { - lock.wait(10); - } catch (InterruptedException ignore) { - } - } - } - - closeAllConnections = false;*//* - throw new RuntimeException("Not implemented."); - } - -*//* @Override - protected boolean send(FlowContext flowContext, SessionPayloadEvent event, boolean wait) { - //Sprawdzamy, czy polaczen nie jest za duzo. Jezeli jest, to zamykamy - //najmniej uzywane. - if (flowIndex.size() > maxSentRequests) { - int diff = flowIndex.size() - maxSentRequests; - if (logger.isDebugEnabled()) { - debug(flowContext, "Too many connections {}.", flowIndex.size()); - } - - Iterator<LocalFlowContext> it = flowIndex.descendingIterator(); - while (it.hasNext()) { - LocalFlowContext indexFlowContext = it.next(); - if (indexFlowContext.eventsQueue.isEmpty() - && !indexFlowContext.isEventSent()) { - disconnect(flowContext); - if (--diff == 0) { - break; - } + @Override + public void disconnectAll(boolean wait) { + if (wait) { + for (; ; ) { + int size = eventsQueue.size(); + if (size == 0 + || (size == 1 && eventsQueue.peek().getType() == DataEvents.DataEnd.TYPE)) { + break; } } } - return super.send(flowContext, event); - + sessions.forEach((k, flowThread) -> { + flowThread.disconnect(wait); - }*//* + flowThread.working = false; + flowThread.interrupt(); + try { + flowThread.join(); + } catch (InterruptedException ignore) { - private boolean canSend(FlowContext flowContext) { - return flowContext.state() == FlowContext.STATE_CONNECTED && !flowContext.isEventSent(); + } + }); + } @Override - protected void flowStateChanged(FlowContext flowContext, int oldState) { - LocalFlowContext localFlowContext = (LocalFlowContext) flowContext; - *//*if (oldState == FlowContext.STATE_REQ_SENT) { - if (semaphore.availablePermits() <= maxSentRequests) { - semaphore.release(); - } - }*//* + public void disconnect(SessionInfo session, boolean wait) { + FlowThread flowThread = flowThread(session); + if (flowThread != null) { + flowThread.disconnect(wait); + } + } - if (closeAllConnections) { - if (localFlowContext.state() < FlowContext.STATE_DISCONNECTING - && !localFlowContext.isEventSent() - && localFlowContext.eventsQueue.isEmpty()) { - disconnect(flowContext); - return; + private FlowThread register(SessionInfo session) { + if (sessions.containsKey(session)) { + logger.warn("Unable to register session '" + session + "'. Session already registered."); + return null; + } + + FlowContext flowContext = new FlowContext(session); + flowContext.createLock(); + FlowHandler client = clientFactory.create(session.getProtocolId()); + client.init(flowContext); + flowContext.client(client); + + FlowThread flowThread = new FlowThread(emitter, flowContext, flowEventsQueue); + flowThread.start(); + sessions.put(session, flowThread); + return flowThread; + } + + protected FlowThread flowThread(SessionEvent event) { + return flowThread(event.getSessionInfo()); + } + + protected FlowThread flowThread(ChannelContext context) { + return flowThread(context.getSessionInfo()); + } + + protected FlowThread flowThread(SessionInfo session) { + FlowThread flowThread = sessions.get(session); + if (flowThread == null) { + if (logger.isDebugEnabled()) { + logger.debug("Context for session '" + session + "' not found."); } } - if (localFlowContext.state() >= FlowContext.STATE_CONNECTED - && localFlowContext.state() < FlowContext.STATE_DISCONNECTING - && !localFlowContext.isEventSent() - && !localFlowContext.eventsQueue.isEmpty()) { + return flowThread; + } - Event event = localFlowContext.eventsQueue.peek(); - if (event.getType() == SessionStatusEvent.TYPE) { - SessionStatusEvent statusEvent = (SessionStatusEvent) event; - if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { - localFlowContext.eventsQueue.poll(); - disconnect((SessionStatusEvent) event); - } - } else if (event.getType() == SessionPayloadEvent.TYPE - && canSend(flowContext)) { - localFlowContext.eventsQueue.poll(); - send(flowContext, (SessionPayloadEvent) event, true); - } else { - localFlowContext.eventsQueue.poll(); + private void waitNoSessions() { + while (!sessions.isEmpty()) { + try { + Thread.sleep(10); + } catch (InterruptedException ignore) { + } } } - private void makeFirst(LocalFlowContext flowContext) { - flowIndex.remove(flowContext); - flowIndex.addFirst(flowContext); - } - - private void addToQueue(LocalFlowContext flowContext, Event event) { - flowContext.eventsQueue.add(event); - makeFirst(flowContext); - } - @Override public void handle(Event event) { - Event newEvent = null; - switch (event.getType()) { - case SessionPayloadEvent.TYPE: - semaphore.acquireUninterruptibly(); - newEvent = eventInstanceForWorker(event); - break; - case SessionStatusEvent.TYPE: - case DataEvents.DataLoopEnd.TYPE: - case DataEvents.DataEnd.TYPE: - newEvent = event; - } - - if (newEvent != null) { - try { - eventsQueue.put(newEvent); - } catch (Exception e) { - logger.debug("Unable to add event to queue. " + e.getMessage(), e); - } + try { + Event newEvent = eventInstanceForWorker(event, index); + eventsQueue.put(newEvent); + } catch (Exception e) { + logger.debug("Unable to add event to queue. " + e.getMessage(), e); } } private void processEvent(Event event) { + if (trace) { + logger.trace("Event processing: {}", event); + } + if (event instanceof SessionEvent) { switch (event.getType()) { - case SessionStatusEvent.TYPE: + case SessionStatusEvent.TYPE: { SessionStatusEvent statusEvent = (SessionStatusEvent) event; - if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { - LocalFlowContext flowContext = flowContext((SessionEvent) event); - if (flowContext != null) { - if (flowContext.eventsQueue.isEmpty() - && !flowContext.isEventSent()) { - disconnect(statusEvent); - } else { - addToQueue(flowContext, event); - } + FlowThread flowThread = flowThread(statusEvent); + if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { + if (flowThread == null) { + flowThread = register(statusEvent.getSessionInfo()); } } + + if (flowThread != null) { + flowThread.handle(event); + } + break; + } case SessionPayloadEvent.TYPE: { SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event; - LocalFlowContext flowContext = flowContext(payloadEvent); - if (flowContext != null) { - if (flowContext.state() >= FlowContext.STATE_CONNECTING - && flowContext.state() < FlowContext.STATE_DISCONNECTING) { - if (flowContext.eventsQueue.isEmpty() - && (flowContext.state() == FlowContext.STATE_CONNECTED - || flowContext.state() == FlowContext.STATE_ERROR - || flowContext.isEventSent())) { - send(flowContext, payloadEvent, true); - } else { - addToQueue(flowContext, event); - } - } - } else { - try { - SessionInfo session = payloadEvent.getSessionInfo(); - flowContext = (LocalFlowContext) register(session); - addToQueue(flowContext, event); - emitter.connect(session, this, index); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } + FlowThread flowThread = flowThread(payloadEvent); + + if (connectPartialSession) { + SessionInfo sessionInfo = payloadEvent.getSessionInfo(); + flowThread = register(sessionInfo); + flowThread.handle(SessionStatusEvent.establishedEvent(sessionInfo)); + } + + if (flowThread != null) { + flowThread.handle(event); } break; @@ -260,7 +208,11 @@ logger.debug("DataLoopEnd received."); } - waitCloseAllConnections(); + sessions.forEach((session, flowThread) -> { + flowThread.handle(event); + }); + + waitNoSessions(); filterChain.reset(); } else if (event.getType() == DataEvents.DataEnd.TYPE) { if (logger.isDebugEnabled()) { @@ -289,110 +241,179 @@ } } - protected class FlowThread extends Thread { + class FlowThread extends Thread implements FlowProcessorSupervisor { + + private final Logger logger = LogManager.getLogger(FlowThread.class); + + private volatile boolean working; + + private final Emitter emitter; private final FlowContext flowContext; - private final Timeouts timeouts; - - private final BlockingQueue<Event> events; + private final FlowProcessor flowProcessor; - public FlowThread(FlowContext flowContext, Timeouts timeouts, int queueSize) { + private final BlockingQueue<Event> queue; + + private FlowThread(Emitter emitter, FlowContext flowContext, int queueSize) { + this.emitter = emitter; this.flowContext = flowContext; - this.timeouts = timeouts; - this.events = new ArrayBlockingQueue<>(queueSize); + flowProcessor = new FlowProcessor(this, logger, index); + this.queue = new ArrayBlockingQueue<>(queueSize); } - protected void connect(FlowContext flowContext, boolean wait) { - flowContext.lock(); + public FlowContext flowContext() { + return flowContext; + } + + @Override + public Emitter getEmitter() { + return emitter; + } + + @Override + public Timeouts getTimeouts() { + return timeouts; + } + + @Override + public FlowFilterChain getFilterChain() { + return filterChain; + } + + @Override + public int getMaxEncoderErrors() { + return maxEncoderErrors; + } + + @Override + public int getMaxConnectionAttempts() { + return maxConnectionAttempts; + } + + @Override + public int getMaxSendErrors() { + return maxSendErrors; + } + + @Override + public long getReconnectDelay() { + return reconnectDelay; + } + + @Override + public FlowContext flowContext(SessionInfo session) { + return flowContext; + } + + @Override + public boolean isCollectMetrics() { + return false; + } + + @Override + public FlowMetric getFlowMetric() { + return null; + } + + @Override + public void onResponseReceived(FlowContext flowContext, Object response) { + fireResponseReceived(flowContext.sentEvent.getRequest(), response, flowContext); + } + + public void disconnect(boolean wait) { + if (wait) { + while (!queue.isEmpty()) { + try { + Thread.sleep(10); + } catch (InterruptedException ignore) { + + } + } + } + + flowProcessor.disconnect(flowContext, wait); + working = false; + interrupt(); + try { - flowContext.connectionAttempts++; - flowContext.state = STATE_CONNECTING; - emitter.connect(flowContext.session, this, index); - if (wait) { - waitOpFinished(flowContext, STATE_CONNECTED); - } - } catch (Exception ex) { - error(flowContext, ex); - } finally { - flowContext.signalAndUnlock(); + join(); + } catch (InterruptedException e) { + } } - protected void disconnect(FlowContext flowContext) { - disconnect(flowContext, true); - } - - protected void disconnect(FlowContext flowContext, boolean wait) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Disconnect."); - } - - long now = timeGenerator.currentTimeMillis(); - flowContext.lock(); + public void handle(Event event) { try { - if (trace) { - debug(flowContext, "Disconnecting."); - } - - if (flowContext.state == STATE_DISCONNECTING - || flowContext.state == STATE_DISCONNECTED) { - return; - } - - flowContext.state = STATE_DISCONNECTING; - flowContext.timeout = now + timeouts.getDisconnectingTimeout(); - - try { - onDisconnecting(flowContext); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Error occurred during onDisconnecting calling.", e); - } - } - - if (flowContext.channelContext() != null) { - try { - flowContext.channelContext().close(); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } - } - - if (wait) { - waitOpFinished(flowContext, STATE_DISCONNECTED, timeouts.getDisconnectingTimeout()); - } - } catch (InterruptedException e) { - error(flowContext, e); - } finally { - flowContext.signalAndUnlock(); + queue.put(event); + } catch (Exception e) { + debug(logger, flowContext, "Unable to add event to queue. ", e); } } - protected void error(FlowContext flowContext, Throwable cause) { - error(flowContext, FlowError.interpret(cause)); + private void finish() { + flowProcessor.disconnect(flowContext, true); + working = false; + } - protected void error(FlowContext flowContext, FlowError error) { - if (flowContext.state >= STATE_CONNECTED && flowContext.state < STATE_DISCONNECTING) { - disconnect(flowContext, false); + @Override + public void run() { + working = true; + if (trace) { + debug(logger, flowContext, "Flow thread started."); } - flowContext.error(error); + while (working) { + Event event = null; + try { + event = queue.take(); + } catch (InterruptedException ignore) { + + } + + if (event != null) { + if (trace) { + trace(logger, flowContext, "Event processing: {}", event); + } + + if (event.getType() == SessionStatusEvent.TYPE) { + SessionStatusEvent statusEvent = (SessionStatusEvent) event; + if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { + flowProcessor.connect(flowContext, true); + } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { + finish(); + } + } else if (event.getType() == SessionPayloadEvent.TYPE) { + if (flowContext.blocked) { + continue; + } else if (mayReconnect(flowContext)) { + if (logger.isDebugEnabled()) { + debug(logger, flowContext, "Reconnecting."); + } + + flowProcessor.connect(flowContext, true); + } + + if (flowContext.state == STATE_CONNECTED) { + flowProcessor.send(flowContext, (SessionPayloadEvent) event, true); + } + } else if (event.getType() == DataEvents.DataLoopEnd.TYPE) { + if (logger.isDebugEnabled()) { + debug(logger, flowContext, "DataLoopEnd received."); + } + + finish(); + } + } + } + + if (trace) { + debug(logger, flowContext, "Flow thread stopped."); + } + + sessions.remove(flowContext.session); } } - protected class LocalFlowContext extends FlowContext { - - private final Queue<Event> eventsQueue; - - private LocalFlowContext(SessionInfo session) { - super(session); - eventsQueue = new LinkedList<>(); - } - - }*/ - }
--- a/stress-tester/src/main/java/com/passus/st/client/SessionStatusEvent.java Mon Apr 06 14:13:39 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SessionStatusEvent.java Tue Apr 07 13:34:22 2020 +0200 @@ -3,7 +3,6 @@ import com.passus.st.emitter.SessionInfo; /** - * * @author Mirosław Hawrot */ public class SessionStatusEvent extends SessionEvent { @@ -68,6 +67,14 @@ + '}'; } + public static SessionStatusEvent openingEvent(SessionInfo sessionInfo) { + return new SessionStatusEvent(sessionInfo, STATUS_OPENING); + } + + public static SessionStatusEvent establishedEvent(SessionInfo sessionInfo) { + return new SessionStatusEvent(sessionInfo, STATUS_ESTABLISHED); + } + @Override public SessionStatusEvent instanceForWorker(int index) { return new SessionStatusEvent(getSessionInfo(), status, getSourceName());
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Mon Apr 06 14:13:39 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Tue Apr 07 13:34:22 2020 +0200 @@ -62,12 +62,6 @@ this.eventsQueueWaitTime = eventsQueueWaitTime; } - protected FlowContext createFlowContext(SessionInfo session) { - FlowContext flowContext = new FlowContext(session); - flowContext.createLock(); - return flowContext; - } - @Override public int activeConnections() { int count = 0; @@ -95,8 +89,9 @@ return null; } - FlowContext flowContext = createFlowContext(session); - //TODO Malo optymalne + FlowContext flowContext = new FlowContext(session); + flowContext.createLock(); + FlowHandler client = clientFactory.create(session.getProtocolId()); client.init(flowContext); flowContext.client(client); @@ -139,32 +134,12 @@ return flowContext; } - protected void disconnectAllConnections(boolean wait) { - for (FlowContext flowContext : sessions.values()) { - flowProcessor.disconnect(flowContext, wait); - } - } - @Override - public void disconnect(SessionInfo session) { - try { - FlowContext flowContext = flowContext(session); - if (flowContext != null) { - flowProcessor.disconnect(flowContext, true); - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } - } - - @Override - public void disconnect() { + public void disconnectAll(boolean wait) { for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) { FlowContext flowContext = entry.getValue(); try { - flowProcessor.disconnect(flowContext, true); + flowProcessor.disconnect(flowContext, wait); } catch (Exception e) { if (logger.isDebugEnabled()) { debug(logger, flowContext, e.getMessage(), e); @@ -178,6 +153,14 @@ } @Override + public void disconnect(SessionInfo session, boolean wait) { + FlowContext flowContext = flowContext(session); + if (flowContext != null) { + flowProcessor.disconnect(flowContext, wait); + } + } + + @Override public void handle(Event event) { Event newEvent = eventInstanceForWorker(event, index); try { @@ -230,7 +213,7 @@ */ private void process(Event event) { sleep(event); - if (logger.isTraceEnabled()) { + if (trace) { logger.trace("Event processing: {}", event); } @@ -260,7 +243,7 @@ flowContext = registerAndConnect(sessEvent.getSessionInfo(), true); } - if (flowContext.state == STATE_CONNECTED) { + if (flowContext != null && flowContext.state == STATE_CONNECTED) { flowProcessor.send(flowContext, (SessionPayloadEvent) event, true); } } @@ -269,7 +252,7 @@ logger.debug("DataLoopEnd received."); } - disconnectAllConnections(true); + disconnectAll(); filterChain.reset(); } else if (event.getType() == DataEvents.DataEnd.TYPE) { if (logger.isDebugEnabled()) {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/client/AbstractFlowWorkerTest.java Tue Apr 07 13:34:22 2020 +0200 @@ -0,0 +1,316 @@ +package com.passus.st.client; + +import com.passus.config.Configuration; +import com.passus.config.ConfigurationContext; +import com.passus.data.ByteBuff; +import com.passus.data.DataEncoder; +import com.passus.data.HeapByteBuff; +import com.passus.net.SocketAddress; +import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpResponse; +import com.passus.net.http.HttpResponseEncoder; +import com.passus.st.Log4jConfigurationFactory; +import com.passus.st.client.SynchFlowWorker.SynchWrapper; +import com.passus.st.emitter.*; +import com.passus.st.metric.MetricsContainer; +import com.passus.st.utils.EventUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static com.passus.st.utils.Assert.assertHttpClientEvents; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; + +public abstract class AbstractFlowWorkerTest { + + static { + Log4jConfigurationFactory.enableFactory("trace"); + } + + //public static final long JOIN_TIMEOUT = 10_000; + public static final long JOIN_TIMEOUT = Long.MAX_VALUE; + + protected final TestHttpClientListener listener = new TestHttpClientListener(); + + protected class LocalEmitter implements Emitter { + + private final Logger LOGGER = LogManager.getLogger(LocalEmitter.class); + + private SessionMapper sessionMapper; + + private boolean started = false; + + private final DataEncoder encoder; + + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + public LocalEmitter() { + this(HttpResponseEncoder.INSTANCE); + } + + public LocalEmitter(DataEncoder encoder) { + this.encoder = encoder; + } + + @Override + public void setSessionMapper(SessionMapper sessionMapper) { + this.sessionMapper = sessionMapper; + } + + @Override + public SessionMapper getSessionMapper() { + return sessionMapper; + } + + @Override + public void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException { + LocalChannelContext channelContext = new LocalChannelContext(this, handler, null, session); + try { + handler.channelRegistered(channelContext); + handler.channelActive(channelContext); + } catch (Exception ex) { + LOGGER.debug(ex.getMessage(), ex); + } + } + + protected void flush(LocalChannelContext channelContext) { + FlowProcessor flowProcessor = (FlowProcessor) channelContext.handler; + SessionPayloadEvent event = extractSentEvent(channelContext); + HttpResponse response = (HttpResponse) event.getResponse(); + + ByteBuff buff = new HeapByteBuff(); + encoder.encode(response, buff); + + executor.execute(() -> { + try { + flowProcessor.dataReceived(channelContext, buff); + } catch (Exception ex) { + LOGGER.error(ex.getMessage(), ex); + } + }); + } + + protected void close(LocalChannelContext channelContext) { + try { + channelContext.handler.channelInactive(channelContext); + channelContext.handler.channelUnregistered(channelContext); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + + @Override + public boolean isStarted() { + return started; + } + + @Override + public void start() { + if (sessionMapper == null) { + sessionMapper = new PassThroughSessionMapper(); + } + + started = true; + } + + @Override + public void stop() { + started = false; + } + + @Override + public boolean isCollectMetrics() { + return false; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + } + + @Override + public void writeMetrics(MetricsContainer container) { + } + + @Override + public void configure(Configuration config, ConfigurationContext context) { + } + } + + protected static class LocalChannelContext implements ChannelContext { + + protected final LocalEmitter emitter; + + protected final EmitterHandler handler; + + protected final SessionInfo sessionInfo; + + protected final Queue<ByteBuffer> dataQueue; + + protected SocketAddress localAddress; + + protected SocketAddress remoteAddress; + + protected boolean bidirectional = true; + + private FlowContext flowContext; + + public LocalChannelContext(LocalEmitter emitter, EmitterHandler handler, SocketAddress remoteAddress, SessionInfo sessionInfo) { + this.emitter = emitter; + this.handler = handler; + this.remoteAddress = remoteAddress; + this.sessionInfo = sessionInfo; + this.dataQueue = new LinkedList<>(); + } + + @Override + public boolean isBidirectional() { + return bidirectional; + } + + @Override + public void setBidirectional(boolean unidirectional) { + this.bidirectional = unidirectional; + } + + @Override + public boolean isConnected() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public boolean isConnectionPending() { + throw new UnsupportedOperationException("Not supported yet."); + } + + private void addToQeueu(ByteBuffer buffer) throws IOException { + dataQueue.add(buffer); + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + addToQeueu(ByteBuffer.wrap(data, offset, length)); + } + + @Override + public void write(ByteBuff data) throws IOException { + addToQeueu(data.toNioByteBuffer()); + } + + @Override + public void flush() throws IOException { + emitter.flush(this); + } + + @Override + public void close() throws IOException { + emitter.close(this); + } + + @Override + public SocketAddress getLocalAddress() { + return localAddress; + } + + @Override + public SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public SessionInfo getSessionInfo() { + return sessionInfo; + } + + @Override + public FlowContext getFlowContext() { + return flowContext; + } + + @Override + public void setFlowContext(FlowContext flowContext) { + this.flowContext = flowContext; + } + } + + protected FlowWorker createWorker() { + return createWorker(new LocalEmitter()); + } + + protected abstract FlowWorker createWorker(Emitter emitter); + + protected abstract SessionPayloadEvent extractSentEvent(LocalChannelContext channelContext); + + private List<Event> readEvents(String pcapFile) throws Exception { + Map<String, Object> props = new HashMap<>(); + props.put("allowPartialSession", true); + props.put("ports", 4214); + return EventUtils.readEvents(pcapFile, props); + } + + @AfterMethod + public void afterMethod() { + listener.clear(); + } + + private void join(FlowWorker worker) { + try { + worker.join(JOIN_TIMEOUT); + } catch (InterruptedException ignore) { + + } + + assertFalse("Worker is still working.", worker.isWorking()); + } + + private List<Event> readDefaultEvents() throws Exception { + List<Event> events = readEvents("pcap/http/http_req_resp.pcap"); + assertEquals(4, events.size()); + return events; + } + + @Test + public void testHandle_HTTP_SimpleRequestResponse() throws Exception { + List<Event> events = readDefaultEvents(); + FlowWorker worker = createWorker(); + worker.start(); + SessionEvent sessionEvent = (SessionEvent) events.get(0); + worker.handle(new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED)); + events.forEach(worker::handle); + join(worker); + + assertHttpClientEvents(events, listener.events()); + } + + @Test + public void testHandle_HTTP_SimpleRequestResponse_ConnectPartialSession() throws Exception { + List<Event> events = readDefaultEvents(); + FlowWorker worker = createWorker(); + worker.setConnectPartialSession(true); + worker.start(); + events.forEach(worker::handle); + join(worker); + assertHttpClientEvents(events, listener.events()); + } + + @Test + public void testHandle_EmitterException_SendErrorsNotReached() throws Exception { + List<Event> events = readDefaultEvents(); + LocalEmitter emitter = new LocalEmitter((object, out) -> { + throw new RuntimeException("Test exception"); + }); + + FlowWorker worker = createWorker(emitter); + worker.setConnectPartialSession(true); + worker.start(); + events.forEach(worker::handle); + join(worker); + } +}
--- a/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java Mon Apr 06 14:13:39 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java Tue Apr 07 13:34:22 2020 +0200 @@ -60,7 +60,6 @@ }) ).when(mockEmitter).connect(any(SessionInfo.class), any(EmitterHandler.class), anyInt()); worker.connect(session); - FlowContext flowContext = worker.flowContext(session); return channelContext; }
--- a/stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java Mon Apr 06 14:13:39 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/ParallelFlowWorkerTest.java Tue Apr 07 13:34:22 2020 +0200 @@ -1,65 +1,20 @@ package com.passus.st.client; -import com.passus.st.AbstractWireMockTest; -import com.passus.st.emitter.RuleBasedSessionMapper; -import com.passus.st.emitter.nio.NioEmitter; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import static com.github.tomakehurst.wiremock.client.WireMock.*; - -public class ParallelFlowWorkerTest extends AbstractWireMockTest { +import com.passus.st.client.ParallelFlowWorker.FlowThread; +import com.passus.st.emitter.Emitter; - private NioEmitter prepareEmitter(String mapperRule) throws Exception { - RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper(); - sessionMapper.addRule(mapperRule); +public class ParallelFlowWorkerTest extends AbstractFlowWorkerTest { - NioEmitter emitter = new NioEmitter(); - emitter.setSessionMapper(sessionMapper); - return emitter; - } - - @BeforeMethod - public void beforeMethod() { - String content = "test"; - stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html")) - .willReturn(aResponse() - .withHeader("Content-Type", "text/plain") - .withHeader("Content-Length", "" + content.length()) - .withBody(content))); + @Override + protected FlowWorker createWorker(Emitter emitter) { + ParallelFlowWorker worker = new ParallelFlowWorker(emitter, "test", 0); + worker.setListener(listener); + return worker; } - @Test(enabled = false) - public void testHandle() throws Exception { - /* Map<String, Object> props = new HashMap<>(); - props.put("allowPartialSession", true); - props.put("ports", 4214); - List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); - assertEquals(4, events.size()); - - NioEmitter emitter = prepareEmitter("10.87.110.40:4214 -> " + HOST + ":" + port()); - emitter.start(); - - TestHttpClientListener listner = new TestHttpClientListener(); - - ParallelFlowWorker worker = new ParallelFlowWorker(emitter, "test", 0); - try { - worker.setListener(listner); - worker.start(); - - events.forEach(worker::handle); - - worker.join(2_000); - assertTrue(listner.size() > 0); - assertTrue(listner.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent); - TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listner.get(0); - String responseStr = event.getResponse().toString(); - assertTrue(responseStr.startsWith("HTTP/1.1 200 OK")); - assertTrue(responseStr.endsWith("test")); - } finally { - ServiceUtils.stopQuietly(emitter); - }*/ - + protected SessionPayloadEvent extractSentEvent(LocalChannelContext channelContext) { + FlowProcessor flowProcessor = (FlowProcessor) channelContext.handler; + FlowThread flowThread = (FlowThread) flowProcessor.getSupervisor(); + return flowThread.flowContext().sentEvent(); } - } \ No newline at end of file
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Mon Apr 06 14:13:39 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Tue Apr 07 13:34:22 2020 +0200 @@ -1,326 +1,24 @@ package com.passus.st.client; -import com.passus.config.Configuration; -import com.passus.config.ConfigurationContext; -import com.passus.data.ByteBuff; -import com.passus.data.DataEncoder; -import com.passus.data.HeapByteBuff; -import com.passus.net.SocketAddress; -import com.passus.net.http.HttpRequest; -import com.passus.net.http.HttpResponse; -import com.passus.net.http.HttpResponseEncoder; -import com.passus.st.Log4jConfigurationFactory; -import com.passus.st.client.SynchFlowWorker.SynchWrapper; -import com.passus.st.emitter.*; -import com.passus.st.metric.MetricsContainer; -import com.passus.st.utils.EventUtils; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.Test; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static com.passus.st.utils.Assert.assertHttpClientEvents; -import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertFalse; - -public class SynchFlowWorkerTest { - - public static final long JOIN_TIMEOUT = Long.MAX_VALUE; - - private final TestHttpClientListener listener = new TestHttpClientListener(); - - private static class LocalEmitter implements Emitter { - - private SessionMapper sessionMapper; - - private boolean started = false; - - private final DataEncoder encoder; - - private final ExecutorService executor = Executors.newSingleThreadExecutor(); - - public LocalEmitter() { - this(HttpResponseEncoder.INSTANCE); - } - - public LocalEmitter(DataEncoder encoder) { - this.encoder = encoder; - } - - @Override - public void setSessionMapper(SessionMapper sessionMapper) { - this.sessionMapper = sessionMapper; - } - - @Override - public SessionMapper getSessionMapper() { - return sessionMapper; - } - - @Override - public void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException { - LocalChannelContext channelContext = new LocalChannelContext(this, handler, null, session); - try { - handler.channelRegistered(channelContext); - handler.channelActive(channelContext); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - - protected void flush(LocalChannelContext channelContext) { - SessionInfo sessionInfo = channelContext.getSessionInfo(); - FlowProcessor flowProcessor = (FlowProcessor) channelContext.handler; - SynchWrapper wrapper = (SynchWrapper) flowProcessor.getSupervisor(); - SynchFlowWorker clientWorker = (SynchFlowWorker) wrapper.getFlowWorker(); - - FlowContext flowContext = clientWorker.flowContext(sessionInfo); - SessionPayloadEvent event = flowContext.sentEvent(); - - HttpRequest request = (HttpRequest) event.getRequest(); - HttpResponse response = (HttpResponse) event.getResponse(); - ByteBuff buff = new HeapByteBuff(); - encoder.encode(response, buff); - - executor.execute(() -> { - try { - flowProcessor.dataReceived(channelContext, buff); - } catch (Exception ex) { - ex.printStackTrace(); - } - }); - } - - protected void close(LocalChannelContext channelContext) { - try { - channelContext.handler.channelInactive(channelContext); - channelContext.handler.channelUnregistered(channelContext); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - - @Override - public boolean isStarted() { - return started; - } - - @Override - public void start() { - if (sessionMapper == null) { - sessionMapper = new PassThroughSessionMapper(); - } - - started = true; - } +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.SessionInfo; - @Override - public void stop() { - started = false; - } - - @Override - public boolean isCollectMetrics() { - return false; - } - - @Override - public void setCollectMetrics(boolean collectMetrics) { - } - - @Override - public void writeMetrics(MetricsContainer container) { - } - - @Override - public void configure(Configuration config, ConfigurationContext context) { - } - } - - private static class LocalChannelContext implements ChannelContext { - - private final LocalEmitter emitter; - - private final EmitterHandler handler; - - private final SessionInfo sessionInfo; - - private final Queue<ByteBuffer> dataQueue; - - private SocketAddress localAddress; - - private SocketAddress remoteAddress; - - protected boolean bidirectional = true; - - private FlowContext flowContext; - - public LocalChannelContext(LocalEmitter emitter, EmitterHandler handler, SocketAddress remoteAddress, SessionInfo sessionInfo) { - this.emitter = emitter; - this.handler = handler; - this.remoteAddress = remoteAddress; - this.sessionInfo = sessionInfo; - this.dataQueue = new LinkedList<>(); - } - - @Override - public boolean isBidirectional() { - return bidirectional; - } - - @Override - public void setBidirectional(boolean unidirectional) { - this.bidirectional = unidirectional; - } +public class SynchFlowWorkerTest extends AbstractFlowWorkerTest { - @Override - public boolean isConnected() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public boolean isConnectionPending() { - throw new UnsupportedOperationException("Not supported yet."); - } - - private void addToQeueu(ByteBuffer buffer) throws IOException { - dataQueue.add(buffer); - } - - @Override - public void write(byte[] data, int offset, int length) throws IOException { - addToQeueu(ByteBuffer.wrap(data, offset, length)); - } - - @Override - public void write(ByteBuff data) throws IOException { - addToQeueu(data.toNioByteBuffer()); - } - - @Override - public void flush() throws IOException { - emitter.flush(this); - } - - @Override - public void close() throws IOException { - emitter.close(this); - } - - @Override - public SocketAddress getLocalAddress() { - return localAddress; - } - - @Override - public SocketAddress getRemoteAddress() { - return remoteAddress; - } - - @Override - public SessionInfo getSessionInfo() { - return sessionInfo; - } - - @Override - public FlowContext getFlowContext() { - return flowContext; - } - - @Override - public void setFlowContext(FlowContext flowContext) { - this.flowContext = flowContext; - } - } - - private SynchFlowWorker createWorker() { - LocalEmitter emitter = new LocalEmitter(); + @Override + protected FlowWorker createWorker(Emitter emitter) { SynchFlowWorker worker = new SynchFlowWorker(emitter, "test", 0); worker.setListener(listener); return worker; } - private List<Event> readEvents(String pcapFile) throws Exception { - Map<String, Object> props = new HashMap<>(); - props.put("allowPartialSession", true); - props.put("ports", 4214); - return EventUtils.readEvents(pcapFile, props); - } - - @AfterMethod - public void afterMethod() { - listener.clear(); - } - - private void join(SynchFlowWorker worker) { - try { - worker.join(JOIN_TIMEOUT); - } catch (InterruptedException ignore) { - - } - - assertFalse("Worker is still working.", worker.isWorking()); - } - - private List<Event> readDefaultEvents() throws Exception { - List<Event> events = readEvents("pcap/http/http_req_resp.pcap"); - assertEquals(4, events.size()); - return events; - } - - @Test - public void testConnectSuccess() { - + protected SessionPayloadEvent extractSentEvent(LocalChannelContext channelContext) { + SessionInfo sessionInfo = channelContext.getSessionInfo(); + FlowProcessor flowProcessor = (FlowProcessor) channelContext.handler; + SynchFlowWorker.SynchWrapper wrapper = (SynchFlowWorker.SynchWrapper) flowProcessor.getSupervisor(); + SynchFlowWorker clientWorker = (SynchFlowWorker) wrapper.getFlowWorker(); + FlowContext flowContext = clientWorker.flowContext(sessionInfo); + return flowContext.sentEvent(); } - @Test - public void testHandle_HTTP_SimpleRequestResponse() throws Exception { - List<Event> events = readDefaultEvents(); - SynchFlowWorker worker = createWorker(); - worker.start(); - SessionEvent sessionEvent = (SessionEvent) events.get(0); - worker.handle(new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED)); - events.forEach(worker::handle); - join(worker); - - assertHttpClientEvents(events, listener.events()); - } - - @Test - public void testHandle_HTTP_SimpleRequestResponse_ConnectPartialSession() throws Exception { - List<Event> events = readDefaultEvents(); - SynchFlowWorker worker = createWorker(); - worker.setConnectPartialSession(true); - worker.start(); - events.forEach(worker::handle); - join(worker); - assertHttpClientEvents(events, listener.events()); - } - - @Test - public void testHandle_EmitterException_SendErrorsNotReached() throws Exception { - List<Event> events = readDefaultEvents(); - LocalEmitter emitter = new LocalEmitter((object, out) -> { - throw new RuntimeException("Test exception"); - }); - SynchFlowWorker worker = new SynchFlowWorker(emitter, "test", 0); - worker.setListener(listener); - worker.setConnectPartialSession(true); - worker.start(); - events.forEach(worker::handle); - join(worker); - } - - @Test(enabled = false) - public void testHandle_EncoderException() throws Exception { - List<Event> events = readDefaultEvents(); - SynchFlowWorker worker = createWorker(); - /*worker.setClientFactory(protocolId -> { - - });*/ - } } \ No newline at end of file