Mercurial > stress-tester
changeset 1008:469c65b34298
FlowWorkerBase refactorization in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java Wed Mar 18 13:29:01 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java Fri Mar 20 14:49:19 2020 +0100 @@ -148,23 +148,6 @@ } @Override - public void sessionInvalidated(SessionInfo session) throws Exception { - synchronized (lock) { - if (logger.isDebugEnabled()) { - logger.debug("Session {} invalidated.", session); - } - - FlowContext flowContext = flowContext(session); - if (flowContext != null) { - changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING); - } - - addBlockedSession(session); - lock.notifyAll(); - } - } - - @Override protected void flowStateChanged(FlowContext context, int oldState) { synchronized (lock) { flowStateChanged = true; @@ -251,7 +234,7 @@ FlowContext flowContext = flowContext(statusEvent); if (flowContext != null) { if (flowContext.state() != FlowContext.STATE_REQ_SENT) { - close(statusEvent); + disconnect(statusEvent); } } }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Wed Mar 18 13:29:01 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Fri Mar 20 14:49:19 2020 +0100 @@ -25,6 +25,8 @@ protected final SessionInfo session; + protected boolean blocked; + protected ByteBuff buffer; protected ChannelContext channelContext; @@ -45,6 +47,14 @@ private boolean bidirectional = true; + protected int connectionAttempts; + + protected int encoderErrors; + + protected int decoderErrors; + + protected int sendErrors; + @Deprecated protected DataDecoder decoder; @@ -155,6 +165,10 @@ return session; } + public boolean isBlocked() { + return blocked; + } + public Object getParam(String name) { if (params == null) { return null;
--- a/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java Wed Mar 18 13:29:01 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java Fri Mar 20 14:49:19 2020 +0100 @@ -269,7 +269,7 @@ } for (FlowWorker worker : workers) { - worker.close(); + worker.disconnect(); worker.interrupt(); try { @@ -284,7 +284,7 @@ public void closeAllConnections() { for (FlowWorker worker : workers) { - worker.close(); + worker.disconnect(); } }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java Wed Mar 18 13:29:01 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java Fri Mar 20 14:49:19 2020 +0100 @@ -20,7 +20,7 @@ defaultTimeouts.put(FlowContext.STATE_CONNECTED, 60_000L); defaultTimeouts.put(FlowContext.STATE_REQ_SENT, 30_000L); defaultTimeouts.put(FlowContext.STATE_RESP_RECEIVED, 60_000L); - defaultTimeouts.put(FlowContext.STATE_ERROR, 60_000L); + defaultTimeouts.put(FlowContext.STATE_ERROR, Long.MAX_VALUE); defaultTimeouts.put(FlowContext.STATE_DISCONNECTING, 2_000L); defaultTimeouts.put(STATE_DISCONNECTED, 0L); DEFAULT_TIMEOUTS = Collections.unmodifiableMap(defaultTimeouts);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Wed Mar 18 13:29:01 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Fri Mar 20 14:49:19 2020 +0100 @@ -128,9 +128,9 @@ public abstract int activeConnections(); - public abstract void close(); + public abstract void disconnect(); - public abstract void close(SessionInfo session); + public abstract void disconnect(SessionInfo session); public abstract void handle(Event event);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Wed Mar 18 13:29:01 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Fri Mar 20 14:49:19 2020 +0100 @@ -12,6 +12,7 @@ import it.unimi.dsi.fastutil.ints.Int2LongArrayMap; import it.unimi.dsi.fastutil.ints.Int2LongMap; +import java.io.IOException; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -38,6 +39,12 @@ private long nextCheckTimeoutsTime = -1; + private int maxConnectionAttempts = 3; + + private int maxEncoderErrors = 3; + + private long reconnectDelay = 1000; + private float sleepFactor = SLEEP_FACTOR_NO_SLEEP; private long lastEventTimestamp = -1; @@ -92,7 +99,25 @@ this.checkTimeoutsPeriod = checkTimeoutsPeriod; } - protected final void changeFlowState(FlowContext flowContext, int state) { + public long getReconnectDelay() { + return reconnectDelay; + } + + public void setReconnectDelay(long reconnectDelay) { + this.reconnectDelay = reconnectDelay; + } + + public int getMaxEncoderErrors() { + return maxEncoderErrors; + } + + public void setMaxEncoderErrors(int maxEncoderErrors) { + this.maxEncoderErrors = maxEncoderErrors; + } + + + + /*protected final void changeFlowState(FlowContext flowContext, int state) { try { if (flowContext.state() == state) { return; @@ -108,20 +133,12 @@ switch (state) { case FlowContext.STATE_CONNECTING: - flowContext.clear(); - break; case FlowContext.STATE_CONNECTED: - flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY)); - break; case FlowContext.STATE_ERROR: - changeFlowState(flowContext, STATE_DISCONNECTED); - break; case FlowContext.STATE_RESP_RECEIVED: - flowContext.sentEvent(null); - flowContext.receivedStartTimestamp(-1); - break; case FlowContext.STATE_DISCONNECTING: - if (flowContext.state() < FlowContext.STATE_DISCONNECTING) { + throw new RuntimeException("Removed."); + *//*if (flowContext.state() < FlowContext.STATE_DISCONNECTING) { if (flowContext.channelContext() != null) { try { flowContext.channelContext().close(); @@ -135,8 +152,7 @@ } } else { return; - } - break; + }*//* case STATE_DISCONNECTED: flowContext.state(STATE_DISCONNECTED); flowContext.clear(); @@ -145,14 +161,12 @@ return; } - long timeout = timeouts.get(flowContext.state()); - flowContext.timeout(timeGenerator.currentTimeMillis() + timeout); - flowContext.state(state); - flowStateChanged(flowContext, oldState); + updateFlowState(flowContext, state, oldState); + throw new RuntimeException("Removed."); } catch (Exception e) { logger.debug(e.getMessage(), e); } - } + }*/ protected void flowStateChanged(FlowContext context, int oldState) { @@ -211,8 +225,7 @@ try { FlowContext flowContext = register(session); if (flowContext != null) { - emitter.connect(session, this, index); - return flowContext; + connect(flowContext); } } catch (Exception e) { logger.error(e.getMessage(), e); @@ -221,13 +234,24 @@ } } + protected void connect(FlowContext flowContext) { + synchronized (lock) { + try { + flowContext.connectionAttempts++; + emitter.connect(flowContext.session, this, index); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + } + @Override - public void close() { + public void disconnect() { synchronized (lock) { for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) { FlowContext flowContext = entry.getValue(); try { - closeSession(flowContext); + disconnect(flowContext); } catch (Exception e) { if (logger.isDebugEnabled()) { debug(flowContext, e.getMessage(), e); @@ -240,14 +264,18 @@ } } - protected void close(SessionEvent sessionEvent) { - close(sessionEvent.getSessionInfo()); + protected void disconnect(SessionEvent sessionEvent) { + disconnect(sessionEvent.getSessionInfo()); } - protected void close(FlowContext flowContext) { + @Override + public void disconnect(SessionInfo session) { synchronized (lock) { try { - closeSession(flowContext); + FlowContext flowContext = flowContext(session); + if (flowContext != null) { + disconnect(flowContext, true); + } } catch (Exception e) { if (logger.isDebugEnabled()) { logger.debug(e.getMessage(), e); @@ -256,26 +284,61 @@ } } - @Override - public void close(SessionInfo session) { - synchronized (lock) { + protected void disconnect(FlowContext flowContext) { + disconnect(flowContext, true); + } + + protected void disconnect(FlowContext flowContext, boolean removeFlow) { + if (flowContext.channelContext() != null) { + updateFlowState(flowContext, STATE_DISCONNECTING); try { - FlowContext flowContext = flowContext(session); - closeSession(flowContext); + flowContext.channelContext().close(); } catch (Exception e) { if (logger.isDebugEnabled()) { logger.debug(e.getMessage(), e); } } + + flowContext.clear(); + if (removeFlow) { + removeFlowContext(flowContext); + } + + updateFlowState(flowContext, STATE_DISCONNECTED); } } - protected void closeSession(FlowContext flowContext) { - synchronized (lock) { - if (flowContext != null) { - changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING); - } + private void error(FlowContext flowContext, Throwable cause) { + if (flowContext.state >= STATE_CONNECTED && flowContext.state < STATE_DISCONNECTING) { + disconnect(flowContext, false); } + + flowContext.blocked = true; + updateFlowState(flowContext, STATE_ERROR); + } + + private void responseReceived(FlowContext flowContext) { + flowContext.sentEvent(null); + flowContext.receivedStartTimestamp(-1); + updateFlowState(flowContext, STATE_RESP_RECEIVED); + } + + private void updateFlowState(FlowContext flowContext, int newState) { + updateFlowState(flowContext, newState, flowContext.state); + } + + private void updateFlowState(FlowContext flowContext, int newState, int oldState) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Flow status changing {} -> {}.", + contextStateToString(flowContext.state()), + contextStateToString(newState) + ); + } + + long timeout = timeouts.get(newState); + flowContext.timeout(timeGenerator.currentTimeMillis() + timeout); + flowContext.state(newState); + flowStateChanged(flowContext, oldState); } protected void removeFlowContext(FlowContext flowContext) { @@ -293,7 +356,7 @@ } SessionInfo session = flowContext.sessionInfo(); - changeFlowState(flowContext, FlowContext.STATE_CONNECTING); + updateFlowState(flowContext, FlowContext.STATE_CONNECTING); emitter.connect(session, this, index); } catch (Exception e) { error(flowContext, e.getMessage(), e); @@ -304,7 +367,7 @@ protected void closeAllConnections() { synchronized (lock) { for (FlowContext flowContext : sessions.values()) { - closeSession(flowContext); + disconnect(flowContext); } } } @@ -355,7 +418,9 @@ context.setBidirectional(flowContext.isBidirectional()); flowContext.channelContext(context); context.setAttachment(flowContext); - changeFlowState(flowContext, STATE_CONNECTED); + flowContext.connectionAttempts = 0; + flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY)); + updateFlowState(flowContext, STATE_CONNECTED); } lock.notifyAll(); @@ -370,7 +435,24 @@ debug(flowContext, "Channel inactive."); } - changeFlowState(flowContext, STATE_DISCONNECTED); + updateFlowState(flowContext, STATE_DISCONNECTED); + lock.notifyAll(); + } + } + + @Override + public void sessionInvalidated(SessionInfo session) throws Exception { + synchronized (lock) { + if (logger.isDebugEnabled()) { + logger.debug("Session {} invalidated.", session); + } + + FlowContext flowContext = flowContext(session); + if (flowContext != null) { + disconnect(flowContext); + } + + addBlockedSession(session); lock.notifyAll(); } } @@ -401,7 +483,7 @@ } decoder.clear(flowContext); - changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED); + responseReceived(flowContext); } else if (decoder.state() == DataDecoder.STATE_FINISHED) { if (collectMetric) { synchronized (metric) { @@ -425,7 +507,7 @@ } decoder.clear(flowContext); - changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED); + responseReceived(flowContext); } } catch (Exception e) { if (collectMetric) { @@ -469,7 +551,7 @@ flowContext.client().onDataWriteEnd(flowContext); if (!flowContext.isBidirectional()) { - changeFlowState(flowContext, STATE_RESP_RECEIVED); + responseReceived(flowContext); } } @@ -478,14 +560,36 @@ } @Override - public void errorOccurred(ChannelContext context, Throwable cause) throws Exception { + public final void errorOccurred(ChannelContext context, Throwable cause) throws Exception { if (logger.isDebugEnabled()) { logger.debug("Error occurred. " + cause.getMessage(), cause); } synchronized (lock) { FlowContext flowContext = (FlowContext) context.getAttachment(); - changeFlowState(flowContext, FlowContext.STATE_ERROR); + //Jezeli nie nastapilo polaczenie flowContext == null + if (flowContext == null) { + flowContext = flowContext(context); + } + + if (flowContext.state == STATE_CONNECTING) { + if (flowContext.connectionAttempts < maxConnectionAttempts) { + //TODO - malo optymalne, blokuje przetwarzanie eventow dla konkretnej sesji. + // Odbije sie nw wydajnosci workera asynch. + if (reconnectDelay > 0) { + try { + Thread.sleep(reconnectDelay); + } catch (InterruptedException ignore) { + + } + } + + connect(flowContext); + return; + } + } + + error(flowContext, cause); lock.notifyAll(); } } @@ -498,10 +602,24 @@ return false; } + ByteBuff buffer = null; FlowHandler client = flowContext.client(); FlowHandlerDataEncoder encoder = client.getRequestEncoder(flowContext); - ByteBuff buffer = flowContext.buffer(); - encoder.encode(req, flowContext, buffer); + buffer = flowContext.buffer(); + try { + encoder.encode(req, flowContext, buffer); + } catch (Exception e) { + flowContext.encoderErrors++; + if (logger.isDebugEnabled()) { + debug(flowContext, e.getMessage(), e); + } + + if (flowContext.encoderErrors == maxEncoderErrors) { + error(flowContext, new IOException()); + } + + return false; + } if (collectMetric) { synchronized (metric) { @@ -511,12 +629,13 @@ } try { - changeFlowState(flowContext, FlowContext.STATE_REQ_SENT); + updateFlowState(flowContext, FlowContext.STATE_REQ_SENT); flowContext.sentEvent(event); flowContext.channelContext().writeAndFlush(buffer); buffer.clear(); return true; } catch (Exception e) { + flowContext.sendErrors++; if (logger.isDebugEnabled()) { debug(flowContext, e.getMessage(), e); } @@ -548,7 +667,7 @@ case FlowContext.STATE_CONNECTED: case FlowContext.STATE_REQ_SENT: case FlowContext.STATE_ERROR: - closeSession(flowContext); + disconnect(flowContext); break; case FlowContext.STATE_RESP_RECEIVED: //Dziwny blad nie powinien wystepowac
--- a/stress-tester/src/main/java/com/passus/st/client/NullFlowWorker.java Wed Mar 18 13:29:01 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/NullFlowWorker.java Fri Mar 20 14:49:19 2020 +0100 @@ -20,12 +20,12 @@ } @Override - public void close() { + public void disconnect() { } @Override - public void close(SessionInfo session) { + public void disconnect(SessionInfo session) { }
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Wed Mar 18 13:29:01 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Fri Mar 20 14:49:19 2020 +0100 @@ -110,7 +110,7 @@ LocalFlowContext indexFlowContext = it.next(); if (indexFlowContext.eventsQueue.isEmpty() && indexFlowContext.state() != FlowContext.STATE_REQ_SENT) { - close(flowContext); + disconnect(flowContext); if (--diff == 0) { break; } @@ -142,7 +142,7 @@ if (localFlowContext.state() < FlowContext.STATE_DISCONNECTING && localFlowContext.state() != FlowContext.STATE_REQ_SENT && localFlowContext.eventsQueue.isEmpty()) { - close(flowContext); + disconnect(flowContext); return; } } @@ -157,7 +157,7 @@ SessionStatusEvent statusEvent = (SessionStatusEvent) event; if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { localFlowContext.eventsQueue.poll(); - close((SessionStatusEvent) event); + disconnect((SessionStatusEvent) event); } } else if (event.getType() == SessionPayloadEvent.TYPE && canSend(flowContext)) { @@ -217,7 +217,7 @@ if (flowContext != null) { if (flowContext.eventsQueue.isEmpty() && flowContext.state() != FlowContext.STATE_REQ_SENT) { - close(statusEvent); + disconnect(statusEvent); } else { addToQueue(flowContext, event); }
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Wed Mar 18 13:29:01 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Fri Mar 20 14:49:19 2020 +0100 @@ -48,23 +48,6 @@ } @Override - public void sessionInvalidated(SessionInfo session) throws Exception { - synchronized (lock) { - if (logger.isDebugEnabled()) { - logger.debug("Session {} invalidated.", session); - } - - FlowContext flowContext = flowContext(session); - if (flowContext != null) { - changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING); - } - - addBlockedSession(session); - lock.notifyAll(); - } - } - - @Override protected void flowStateChanged(FlowContext context, int oldState) { if (logger.isDebugEnabled()) { logger.debug("flowStateChanged {},{}", context == currFlowContext, contextStateToString(context.state())); @@ -131,10 +114,6 @@ * @return boolean */ private boolean pollNext() { - if (currFlowContext != null) { - return false; - } - Event event = eventsQueue.poll(); if (event != null) { sleep(event); @@ -166,7 +145,7 @@ currFlowContext = flowContext((SessionEvent) event); if (currFlowContext != null) { if (currFlowContext.state() != FlowContext.STATE_REQ_SENT) { - close(statusEvent); + disconnect(statusEvent); } } } @@ -174,6 +153,10 @@ return true; } else if (event.getType() == SessionPayloadEvent.TYPE) { FlowContext flowContext = flowContext(sessEvent); + if (flowContext.blocked) { + return true; + } + if (flowContext != null) { switch (flowContext.state()) { case FlowContext.STATE_CONNECTED: @@ -256,10 +239,10 @@ } @Override - public void close() { + public void disconnect() { synchronized (lock) { eventsQueue.clear(); - super.close(); + super.disconnect(); lock.notifyAll(); } }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java Wed Mar 18 13:29:01 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java Fri Mar 20 14:49:19 2020 +0100 @@ -4,6 +4,7 @@ import com.passus.commons.time.TimeAware; import com.passus.commons.time.TimeGenerator; import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpResponse; import com.passus.st.client.FlowContext; import com.passus.st.client.FlowHandler; import com.passus.st.client.FlowHandlerDataDecoder; @@ -16,9 +17,9 @@ public class HttpFlowHandler implements FlowHandler, TimeAware { - private final HttpFlowHandlerDataDecoder decoder; + private final FlowHandlerDataDecoder<HttpResponse> decoder; - private final HttpFlowHandlerDataEncoder encoder; + private final FlowHandlerDataEncoder<HttpRequest> encoder; TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); @@ -34,6 +35,12 @@ scopes = new HttpScopes(); } + public HttpFlowHandler(FlowHandlerDataDecoder<HttpResponse> decoder, + FlowHandlerDataEncoder<HttpRequest> encoder) { + this.decoder = decoder; + this.encoder = encoder; + } + @Override public int getProtocolId() { return HTTP;
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Wed Mar 18 13:29:01 2020 +0100 +++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Fri Mar 20 14:49:19 2020 +0100 @@ -55,7 +55,11 @@ } public SessionInfo(String srcIp, int srcPort, String dstIp, int dstPort) { - this(IpAddress.parse(srcIp), srcPort, IpAddress.parse(dstIp), dstPort); + this(srcIp, srcPort, dstIp, dstPort, DEFAULT_TRANSPORT, UNKNOWN); + } + + public SessionInfo(String srcIp, int srcPort, String dstIp, int dstPort, int transport, int protocolId) { + this(IpAddress.parse(srcIp), srcPort, IpAddress.parse(dstIp), dstPort, transport, protocolId); } public SessionInfo(IpAddress srcIp, int srcPort, IpAddress dstIp, int dstPort) {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java Fri Mar 20 14:49:19 2020 +0100 @@ -0,0 +1,159 @@ +package com.passus.st.client; + +import com.passus.st.emitter.ChannelContext; +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.utils.TriFunction; +import org.apache.commons.lang3.mutable.MutableInt; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.ConnectException; + +import static com.passus.net.session.Session.PROTOCOL_TCP; +import static com.passus.st.Protocols.HTTP; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.*; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNull; + +public class FlowWorkerBaseTest { + + private final SessionInfo session = new SessionInfo("1.1.1.1", 10, "2.2.2.2", 20, PROTOCOL_TCP, HTTP); + + private final Emitter mockEmitter = mock(Emitter.class); + + private ChannelContext mockChannelContext() { + return mockChannelContext(session); + } + + private TestFlowWorker worker() { + TestFlowWorker worker = new TestFlowWorker(mockEmitter); + worker.setReconnectDelay(0); + return worker; + } + + private ChannelContext mockChannelContext(SessionInfo session) { + ChannelContext mockChannelContext = mock(ChannelContext.class); + when(mockChannelContext.getSessionInfo()).thenReturn(session); + return mockChannelContext; + } + + private Void applyConnect(InvocationOnMock invocation, TriFunction<SessionInfo, EmitterHandler, Integer, Void> callback) throws Exception { + Object[] arguments = invocation.getArguments(); + SessionInfo session = (SessionInfo) arguments[0]; + EmitterHandler handler = (EmitterHandler) arguments[1]; + Integer index = (Integer) arguments[2]; + + callback.apply(session, handler, index); + return null; + } + + private ChannelContext makeConnected(TestFlowWorker worker, SessionInfo session) throws Exception { + ChannelContext channelContext = mockChannelContext(); + doAnswer((Answer<Void>) invocation -> + applyConnect(invocation, (sessionInfo, handler, integer) -> { + handler.channelActive(channelContext); + return null; + }) + ).when(mockEmitter).connect(any(SessionInfo.class), any(EmitterHandler.class), anyInt()); + worker.connect(session); + FlowContext flowContext = worker.flowContext(session); + return channelContext; + } + + @Test + public void testConnectionSuccess() throws Exception { + TestFlowWorker worker = worker(); + makeConnected(worker, session); + + FlowContext flowContext = worker.flowContext(session); + assertEquals(FlowContext.STATE_CONNECTED, flowContext.state); + assertEquals(false, flowContext.blocked); + } + + @Test + public void testConnectionSuccess_FirstConnectionFailed() throws Exception { + ChannelContext channelContext = mockChannelContext(); + MutableInt count = new MutableInt(0); + doAnswer((Answer<Void>) invocation -> + applyConnect(invocation, (sessionInfo, handler, integer) -> { + + if (count.intValue() == 1) { + handler.channelActive(channelContext); + } else { + count.incrementAndGet(); + handler.errorOccurred(channelContext, new ConnectException("connection refused")); + } + return null; + }) + ).when(mockEmitter).connect(any(SessionInfo.class), any(EmitterHandler.class), anyInt()); + + TestFlowWorker worker = worker(); + worker.connect(session); + FlowContext flowContext = worker.flowContext(session); + + assertEquals(1, count.intValue()); + assertEquals(FlowContext.STATE_CONNECTED, flowContext.state); + assertEquals(0, flowContext.connectionAttempts); + assertEquals(false, flowContext.blocked); + } + + @Test + public void testConnectionFailed_ConnectionAttemptsReached() throws Exception { + ChannelContext channelContext = mockChannelContext(); + doAnswer((Answer<Void>) invocation -> + applyConnect(invocation, (sessionInfo, handler, integer) -> { + handler.errorOccurred(channelContext, new ConnectException("connection refused")); + return null; + }) + ).when(mockEmitter).connect(any(SessionInfo.class), any(EmitterHandler.class), anyInt()); + + TestFlowWorker worker = worker(); + worker.connect(session); + FlowContext flowContext = worker.flowContext(session); + + assertEquals(FlowContext.STATE_ERROR, flowContext.state); + assertEquals(true, flowContext.blocked); + } + + @Test + public void testDisconnect() throws Exception { + TestFlowWorker worker = worker(); + makeConnected(worker, session); + + FlowContext flowContext = worker.flowContext(session); + worker.disconnect(session); + + assertEquals(FlowContext.STATE_DISCONNECTED, flowContext.state); + assertEquals(null, flowContext.buffer); + assertNull(worker.flowContext(session)); + } + + public class TestFlowWorker extends FlowWorkerBase { + + public TestFlowWorker(Emitter emitter) { + this(emitter, "test", 0); + } + + public TestFlowWorker(Emitter emitter, String name, int index) { + super(emitter, name, index); + } + + @Override + protected FlowContext flowContext(SessionInfo session) { + return super.flowContext(session); + } + + @Override + public void handle(Event event) { + + } + + } + +} \ No newline at end of file
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Wed Mar 18 13:29:01 2020 +0100 +++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Fri Mar 20 14:49:19 2020 +0100 @@ -3,6 +3,7 @@ import com.passus.config.Configuration; import com.passus.config.ConfigurationContext; import com.passus.data.ByteBuff; +import com.passus.data.DataEncoder; import com.passus.data.HeapByteBuff; import com.passus.net.SocketAddress; import com.passus.net.http.HttpRequest; @@ -20,9 +21,13 @@ import static com.passus.st.utils.Assert.assertHttpClientEvents; import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; public class SynchFlowWorkerTest { + public static final long JOIN_TIMEOUT = 1_000; + + private final TestHttpClientListener listener = new TestHttpClientListener(); private static class LocalEmitter implements Emitter { @@ -31,6 +36,16 @@ private boolean started = false; + private final DataEncoder encoder; + + public LocalEmitter() { + this(HttpResponseEncoder.INSTANCE); + } + + public LocalEmitter(DataEncoder encoder) { + this.encoder = encoder; + } + @Override public void setSessionMapper(SessionMapper sessionMapper) { this.sessionMapper = sessionMapper; @@ -61,7 +76,7 @@ HttpRequest request = (HttpRequest) event.getRequest(); HttpResponse response = (HttpResponse) event.getResponse(); ByteBuff buff = new HeapByteBuff(); - HttpResponseEncoder.INSTANCE.encode(response, buff); + encoder.encode(response, buff); try { clientWorker.dataReceived(channelContext, buff); } catch (Exception ex) { @@ -230,32 +245,66 @@ listener.clear(); } + private void join(SynchFlowWorker worker) { + try { + worker.join(JOIN_TIMEOUT); + } catch (InterruptedException ignore) { + + } + + assertFalse(worker.isWorking()); + } + + private List<Event> readDefaultEvents() throws Exception { + List<Event> events = readEvents("pcap/http/http_req_resp.pcap"); + assertEquals(4, events.size()); + return events; + } + @Test public void testHandle_HTTP_SimpleRequestResponse() throws Exception { - List<Event> events = readEvents("pcap/http/http_req_resp.pcap"); - assertEquals(4, events.size()); - + List<Event> events = readDefaultEvents(); SynchFlowWorker worker = createWorker(); worker.start(); SessionEvent sessionEvent = (SessionEvent) events.get(0); worker.handle(new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED)); events.forEach(worker::handle); - worker.join(); + join(worker); assertHttpClientEvents(events, listener.events()); } @Test public void testHandle_HTTP_SimpleRequestResponse_ConnectPartialSession() throws Exception { - List<Event> events = readEvents("pcap/http/http_req_resp.pcap"); - assertEquals(4, events.size()); - + List<Event> events = readDefaultEvents(); SynchFlowWorker worker = createWorker(); worker.setConnectPartialSession(true); worker.start(); events.forEach(worker::handle); - worker.join(); - + join(worker); assertHttpClientEvents(events, listener.events()); } + + @Test + public void testHandle_EmitterException() throws Exception { + List<Event> events = readDefaultEvents(); + LocalEmitter emitter = new LocalEmitter((object, out) -> { + throw new RuntimeException("Test exception"); + }); + SynchFlowWorker worker = new SynchFlowWorker(emitter, "test", 0); + worker.setListener(listener); + worker.setConnectPartialSession(true); + worker.start(); + events.forEach(worker::handle); + join(worker); + } + + @Test(enabled = false) + public void testHandle_EncoderException() throws Exception { + List<Event> events = readDefaultEvents(); + SynchFlowWorker worker = createWorker(); + /*worker.setClientFactory(protocolId -> { + + });*/ + } } \ No newline at end of file
--- a/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java Wed Mar 18 13:29:01 2020 +0100 +++ b/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java Fri Mar 20 14:49:19 2020 +0100 @@ -53,7 +53,7 @@ try { flowExecutor.start(); events.forEach(flowExecutor::handle); - flowExecutor.join(5_000); + flowExecutor.join(); assertEquals(0, listener.getErrors()); assertEquals(2, listener.getReceived().size());
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/utils/TriFunction.java Fri Mar 20 14:49:19 2020 +0100 @@ -0,0 +1,8 @@ +package com.passus.st.utils; + +@FunctionalInterface +public interface TriFunction<A, B, C, R> { + + R apply(A a, B b, C c) throws Exception; + +} \ No newline at end of file