Mercurial > stress-tester
changeset 1031:37d098b33b23
FlowWorkerBase refactorization in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java Thu Apr 02 10:40:41 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java Thu Apr 02 14:02:38 2020 +0200 @@ -249,9 +249,10 @@ case FlowContext.STATE_CONNECTING: return false; case FlowContext.STATE_CONNECTED: - if(flowContext.isEventSent()) { + if (flowContext.isEventSent()) { return false; - } else if (send(flowContext, (SessionPayloadEvent) event)) { + } else { + send(flowContext, (SessionPayloadEvent) event, true); return true; } case FlowContext.STATE_DISCONNECTING:
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Thu Apr 02 10:40:41 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Thu Apr 02 14:02:38 2020 +0200 @@ -8,14 +8,17 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; public class FlowContext { - public static final byte STATE_ERROR = -1; - public static final byte STATE_CONNECTING = 0; - public static final byte STATE_CONNECTED = 1; - public static final byte STATE_DISCONNECTING = 4; - public static final byte STATE_DISCONNECTED = 5; + public static final byte STATE_NEW = 0; + public static final byte STATE_CONNECTING = 1 << 1; + public static final byte STATE_CONNECTED = 1 << 2; + public static final byte STATE_DISCONNECTING = 1 << 3; + public static final byte STATE_DISCONNECTED = 1 << 4; + public static final byte STATE_ERROR = 1 << 5; public static final int INIT_BUFFER_CAPACITY = 1024; @@ -23,7 +26,9 @@ protected final SessionInfo session; - protected boolean blocked; + protected volatile boolean blocked; + + protected volatile boolean sessionEstablishedSeen; protected ByteBuff buffer; @@ -33,17 +38,19 @@ protected byte state = STATE_CONNECTING; + protected long connectionTime = -1; + protected long timeout = -1; protected long receivedStartTime = -1; protected long sendStartTime = -1; - private int loop; + protected int loop; - private FlowHandler client; + protected FlowHandler client; - private boolean bidirectional = true; + protected boolean bidirectional = true; protected int connectionAttempts; @@ -55,6 +62,10 @@ protected FlowError error; + protected ReentrantLock lock; + + protected Condition lockCond; + @Deprecated protected DataDecoder decoder; @@ -64,6 +75,23 @@ this.session = session; } + public void createLock() { + lock = new ReentrantLock(); + lockCond = lock.newCondition(); + } + + void lock() { + lock.lock(); + } + + void unlock() { + lock.unlock(); + } + + void signal() { + lockCond.signal(); + } + public boolean isBidirectional() { return bidirectional; } @@ -96,6 +124,14 @@ return state; } + public boolean isError() { + return state == STATE_ERROR; + } + + public boolean isConnected() { + return state == STATE_CONNECTED; + } + public void timeout(long timeout) { this.timeout = timeout; } @@ -118,10 +154,6 @@ return error; } - public boolean isError() { - return error != null; - } - public long receivedStartTimestamp() { return receivedStartTime; } @@ -231,6 +263,7 @@ } public void clear() { + sessionEstablishedSeen = false; buffer = null; sentEvent = null; timeout = -1; @@ -244,10 +277,13 @@ @Override public String toString() { - return "FlowContext{state=" + contextStateToString(state) + '}'; + return "FlowContext{" + + "session=" + session + + ", state=" + stateToString(state) + + '}'; } - public static String contextStateToString(int state) { + public static String stateToString(int state) { switch (state) { case STATE_ERROR: return "error"; @@ -255,10 +291,6 @@ return "connecting"; case STATE_CONNECTED: return "connected"; -/* case STATE_REQ_SENT: - return "req_sent"; - case STATE_RESP_RECEIVED: - return "resp_received";*/ case STATE_DISCONNECTING: return "disconnecting"; case STATE_DISCONNECTED:
--- a/stress-tester/src/main/java/com/passus/st/client/FlowError.java Thu Apr 02 10:40:41 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowError.java Thu Apr 02 14:02:38 2020 +0200 @@ -33,6 +33,8 @@ public static final byte CODE_SSL_UNRECOGNIZED_MESSAGE_ERROR = 18; public static final byte CODE_SSL_HANDSHAKE_ERROR = 19; + public static final byte CODE_INTERNAL_ERROR = (byte) 255; + private final byte code; private final String message; @@ -78,6 +80,14 @@ return new FlowError(CODE_UNKNOWN_ERROR, "Unknown error.", cause); } + public static FlowError internalError() { + return internalError(null); + } + + public static FlowError internalError(Throwable cause) { + return new FlowError(CODE_INTERNAL_ERROR, "Internal error.", cause); + } + public static FlowError interpret(Throwable ex, String stackTrace) { if (stackTrace.contains("Connection refused")) { return new FlowError(CODE_CONNECTION_REFUSED, "Connection refused.", ex);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java Thu Apr 02 10:40:41 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowUtils.java Thu Apr 02 14:02:38 2020 +0200 @@ -12,18 +12,6 @@ public class FlowUtils { - public static final Map<Byte, Long> DEFAULT_TIMEOUTS; - - static { - Map<Byte, Long> defaultTimeouts = new HashMap<>(); - defaultTimeouts.put(FlowContext.STATE_CONNECTING, 10_000L); - defaultTimeouts.put(FlowContext.STATE_CONNECTED, 60_000L); - defaultTimeouts.put(FlowContext.STATE_ERROR, Long.MAX_VALUE); - defaultTimeouts.put(FlowContext.STATE_DISCONNECTING, 2_000L); - defaultTimeouts.put(STATE_DISCONNECTED, 0L); - DEFAULT_TIMEOUTS = Collections.unmodifiableMap(defaultTimeouts); - } - private FlowUtils() { }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Thu Apr 02 10:40:41 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Thu Apr 02 14:02:38 2020 +0200 @@ -35,11 +35,14 @@ protected TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); + protected final boolean trace; + public FlowWorker(Emitter emitter, String name, int index) { super(name + index); Assert.notNull(emitter, "emitter"); this.emitter = emitter; this.index = index; + this.trace = logger.isTraceEnabled(); } public boolean isConnectPartialSession() {
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Thu Apr 02 10:40:41 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Thu Apr 02 14:02:38 2020 +0200 @@ -9,18 +9,15 @@ import com.passus.st.emitter.Emitter; import com.passus.st.emitter.SessionInfo; import com.passus.st.metric.MetricsContainer; -import it.unimi.dsi.fastutil.bytes.Byte2LongArrayMap; -import it.unimi.dsi.fastutil.bytes.Byte2LongMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import static com.passus.st.client.FlowContext.*; -import static com.passus.st.client.FlowError.CODE_CONNECTION_ATTEMPTS_REACHED; -import static com.passus.st.client.FlowError.CODE_MAX_ENCODER_ERRORS_REACHED; -import static com.passus.st.client.FlowUtils.DEFAULT_TIMEOUTS; +import static com.passus.st.client.FlowError.*; public abstract class FlowWorkerBase extends FlowWorker { @@ -30,7 +27,7 @@ private final Set<SessionInfo> blockedSessions = new HashSet<>(); - private final Byte2LongMap timeouts = new Byte2LongArrayMap(); + private final Timeouts timeouts = new Timeouts(); protected final Object lock = new Object(); @@ -52,7 +49,6 @@ public FlowWorkerBase(Emitter emitter, String name, int index) { super(emitter, name, index); - timeouts.putAll(DEFAULT_TIMEOUTS); } @Override @@ -140,7 +136,9 @@ } protected FlowContext createFlowContext(SessionInfo session) { - return new FlowContext(session); + FlowContext flowContext = new FlowContext(session); + flowContext.createLock(); + return flowContext; } protected FlowContext register(SessionEvent sessionEvent) { @@ -148,20 +146,18 @@ } protected FlowContext register(SessionInfo session) { - synchronized (lock) { - if (sessions.containsKey(session)) { - logger.warn("Unable to register session '" + session + "'. Session already registered."); - return null; - } + if (sessions.containsKey(session)) { + logger.warn("Unable to register session '" + session + "'. Session already registered."); + return null; + } - FlowContext flowContext = createFlowContext(session); - //TODO Malo optymalne - FlowHandler client = clientFactory.create(session.getProtocolId()); - client.init(flowContext); - flowContext.client(client); - sessions.put(session, flowContext); - return flowContext; - } + FlowContext flowContext = createFlowContext(session); + //TODO Malo optymalne + FlowHandler client = clientFactory.create(session.getProtocolId()); + client.init(flowContext); + flowContext.client(client); + sessions.put(session, flowContext); + return flowContext; } protected FlowContext connect(SessionEvent sessionEvent) { @@ -173,43 +169,61 @@ try { FlowContext flowContext = register(session); if (flowContext != null) { - connect(flowContext); + connect(flowContext, true); } } catch (Exception e) { - logger.error(e.getMessage(), e); + logger.error(e); } return null; } } + protected FlowContext registerAndConnect(SessionInfo session, boolean wait) { + FlowContext flowContext = flowContext(session); + if (flowContext != null) { + throw new RuntimeException("Not implemented yet."); + } else { + flowContext = register(session); + } + + connect(flowContext, wait); + return flowContext; + } + protected void connect(FlowContext flowContext) { - synchronized (lock) { - try { - flowContext.connectionAttempts++; - emitter.connect(flowContext.session, this, index); - } catch (Exception e) { - logger.error(e.getMessage(), e); + connect(flowContext, true); + } + + protected void connect(FlowContext flowContext, boolean wait) { + flowContext.lock.lock(); + try { + flowContext.connectionAttempts++; + emitter.connect(flowContext.session, this, index); + if (wait) { + waitOpFinished(flowContext, STATE_CONNECTED); } + } catch (Exception ex) { + error(flowContext, ex); + } finally { + flowContext.lock.unlock(); } } @Override public void disconnect() { - synchronized (lock) { - for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) { - FlowContext flowContext = entry.getValue(); - try { - disconnect(flowContext); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - debug(flowContext, e.getMessage(), e); - } + for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) { + FlowContext flowContext = entry.getValue(); + try { + disconnect(flowContext); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + debug(flowContext, e.getMessage(), e); } } + } - sessions.clear(); - working = false; - } + sessions.clear(); + working = false; } protected void disconnect(SessionEvent sessionEvent) { @@ -218,16 +232,14 @@ @Override public void disconnect(SessionInfo session) { - synchronized (lock) { - try { - FlowContext flowContext = flowContext(session); - if (flowContext != null) { - disconnect(flowContext, true); - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } + try { + FlowContext flowContext = flowContext(session); + if (flowContext != null) { + disconnect(flowContext, true); + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); } } } @@ -237,22 +249,70 @@ } protected void disconnect(FlowContext flowContext, boolean removeFlow) { - if (flowContext.channelContext() != null) { - updateFlowState(flowContext, STATE_DISCONNECTING); + disconnect(flowContext, removeFlow, true); + } + + protected void disconnect(FlowContext flowContext, boolean removeFlow, boolean wait) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Disconnect."); + } + + SessionInfo sessionInfo = null; + long now = timeGenerator.currentTimeMillis(); + flowContext.lock.lock(); + try { + if (flowContext.state == STATE_DISCONNECTING + || flowContext.state == STATE_DISCONNECTED) { + return; + } + + /*if (removeFlow) { + sessionInfo = flowContext.session; + }*/ + + flowContext.state = STATE_DISCONNECTING; + flowContext.timeout = now + timeouts.getDisconnectingTimeout(); + try { - flowContext.channelContext().close(); + onDisconnecting(flowContext); } catch (Exception e) { if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); + debug(flowContext, "Error occurred during onDisconnecting calling.", e); } } - flowContext.clear(); - if (removeFlow) { - removeFlowContext(flowContext); + if (flowContext.channelContext() != null) { + try { + flowContext.channelContext().close(); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } } - updateFlowState(flowContext, STATE_DISCONNECTED); + if (wait) { + waitOpFinished(flowContext, STATE_DISCONNECTED, timeouts.getDisconnectingTimeout()); + } + } catch (InterruptedException e) { + error(flowContext, e); + } finally { + flowContext.lock.unlock(); + } + + if (removeFlow && sessionInfo != null) { + synchronized (sessions) { + sessions.remove(sessionInfo); + } + } + } + + protected void errorInternal(FlowContext flowContext, Throwable cause) { + if (flowContext == null) { + logger.error("Internal error.", cause); + } else { + logger.error("Flow {} internal error.", flowContext, cause); + error(flowContext, FlowError.internalError(cause)); } } @@ -266,10 +326,10 @@ } flowContext.error(error); - flowError(flowContext); + onError(flowContext); } - protected void flowError(FlowContext flowContext) { + protected void onError(FlowContext flowContext) { } @@ -278,36 +338,30 @@ onRequestSent(flowContext, event); } - protected void onRequestSent(FlowContext flowContext, SessionPayloadEvent event) { - - } - private void responseReceived0(FlowContext flowContext, Object response) { onResponseReceived(flowContext, response); flowContext.sentEvent(null); flowContext.receivedStartTimestamp(-1); } - protected void onResponseReceived(FlowContext flowContext, Object response) { + protected void onConnected(FlowContext flowContext) { } - private void updateFlowState(FlowContext flowContext, byte newState) { - updateFlowState(flowContext, newState, flowContext.state); + protected void onDisconnecting(FlowContext flowContext) { + } - private void updateFlowState(FlowContext flowContext, byte newState, byte oldState) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Flow status changing {} -> {}.", - contextStateToString(flowContext.state()), - contextStateToString(newState) - ); - } + protected void onDisconnected(FlowContext flowContext) { - long timeout = timeouts.get(newState); - flowContext.timeout(timeGenerator.currentTimeMillis() + timeout); - flowContext.state(newState); - flowStateChanged(flowContext, oldState); + } + + protected void onRequestSent(FlowContext flowContext, SessionPayloadEvent event) { + + } + + protected void onResponseReceived(FlowContext flowContext, Object response) { + } protected void removeFlowContext(FlowContext flowContext) { @@ -317,11 +371,11 @@ } } - protected void reconnect(FlowContext flowContext) { + /*protected void reconnect(FlowContext flowContext) { synchronized (lock) { try { if (logger.isDebugEnabled()) { - debug(flowContext, "Reconnect (state: {}).", contextStateToString(flowContext.state())); + debug(flowContext, "Reconnect (state: {}).", stateToString(flowContext.state())); } SessionInfo session = flowContext.sessionInfo(); @@ -331,13 +385,15 @@ error(flowContext, e.getMessage(), e); } } - } + }*/ protected void closeAllConnections() { - synchronized (lock) { - for (FlowContext flowContext : sessions.values()) { - disconnect(flowContext); - } + closeAllConnections(true); + } + + protected void closeAllConnections(boolean wait) { + for (FlowContext flowContext : sessions.values()) { + disconnect(flowContext, wait); } } @@ -368,45 +424,70 @@ @Override public void writeMetrics(MetricsContainer container) { - synchronized (lock) { - super.writeMetrics(container); - } + super.writeMetrics(container); } @Override public void channelActive(ChannelContext context) throws Exception { - synchronized (lock) { - FlowContext flowContext = flowContext(context); - if (flowContext != null) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})", - context.getLocalAddress(), - context.getRemoteAddress()); - } + FlowContext flowContext = flowContext(context); + if (flowContext != null) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})", + context.getLocalAddress(), + context.getRemoteAddress()); + } + flowContext.lock(); + try { context.setBidirectional(flowContext.isBidirectional()); flowContext.channelContext(context); context.setAttachment(flowContext); flowContext.connectionAttempts = 0; flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY)); - updateFlowState(flowContext, STATE_CONNECTED); + flowContext.state = STATE_CONNECTED; + + try { + onConnected(flowContext); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Error occurred during onConnected calling.", e); + } + } + } catch (Exception ex) { + error(flowContext, ex); + } finally { + flowContext.signal(); + flowContext.unlock(); } - - lock.notifyAll(); } } @Override public void channelInactive(ChannelContext context) throws Exception { - synchronized (lock) { - FlowContext flowContext = (FlowContext) context.getAttachment(); - if (logger.isDebugEnabled()) { - debug(flowContext, "Channel inactive."); + FlowContext flowContext = (FlowContext) context.getAttachment(); + if (logger.isDebugEnabled()) { + debug(flowContext, "Channel inactive."); + } + + flowContext.lock(); + try { + flowContext.state = STATE_DISCONNECTED; + try { + onDisconnected(flowContext); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Error occurred during onDisconnected calling.", e); + } } - updateFlowState(flowContext, STATE_DISCONNECTED); - lock.notifyAll(); + sessions.remove(flowContext.session); + } finally { + flowContext.clear(); + flowContext.signal(); + flowContext.unlock(); } + + } @Override @@ -428,8 +509,9 @@ @Override public void dataReceived(ChannelContext context, ByteBuff data) throws Exception { - synchronized (lock) { - FlowContext flowContext = (FlowContext) context.getAttachment(); + FlowContext flowContext = (FlowContext) context.getAttachment(); + flowContext.lock(); + try { try { FlowHandler client = flowContext.client(); FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext); @@ -491,8 +573,9 @@ error(flowContext, FlowError.unknownError()); } - - lock.notifyAll(); + } finally { + flowContext.signal(); + flowContext.unlock(); } } @@ -512,9 +595,9 @@ public void dataWritten(ChannelContext context) throws Exception { synchronized (lock) { FlowContext flowContext = (FlowContext) context.getAttachment(); - if (flowContext.sentEvent() != null) { - long now = timeGenerator.currentTimeMillis(); + if (flowContext.isEventSent()) { if (collectMetric) { + long now = timeGenerator.currentTimeMillis(); synchronized (metric) { metric.addRequestSendingTime(now - flowContext.sendStartTimestamp()); } @@ -533,16 +616,17 @@ @Override public final void errorOccurred(ChannelContext context, Throwable cause) throws Exception { if (logger.isDebugEnabled()) { - logger.debug("Error occurred. " + cause.getMessage(), cause); + logger.debug("Error occurred. ", cause); } - synchronized (lock) { - FlowContext flowContext = (FlowContext) context.getAttachment(); - //Jezeli nie nastapilo polaczenie flowContext == null - if (flowContext == null) { - flowContext = flowContext(context); - } + FlowContext flowContext = (FlowContext) context.getAttachment(); + //Jezeli nie nastapilo polaczenie flowContext == null + if (flowContext == null) { + flowContext = flowContext(context); + } + flowContext.lock.lock(); + try { if (flowContext.state == STATE_CONNECTING) { if (flowContext.connectionAttempts < maxConnectionAttempts) { //TODO - malo optymalne, blokuje przetwarzanie eventow dla konkretnej sesji. @@ -559,18 +643,20 @@ return; } } + } finally { + flowContext.lock.unlock(); + } - error(flowContext, new FlowError(CODE_CONNECTION_ATTEMPTS_REACHED, "Max connection attempts reached.")); - lock.notifyAll(); - } + error(flowContext, new FlowError(CODE_CONNECTION_ATTEMPTS_REACHED, "Max connection attempts reached.")); } - protected boolean send(FlowContext flowContext, SessionPayloadEvent event) { - synchronized (lock) { + protected void send(FlowContext flowContext, SessionPayloadEvent event, boolean wait) { + flowContext.lock(); + try { Object req = event.getRequest(); if (req != null) { if (filterChain.filterOutbound(req, event.getResponse(), flowContext) == Filter.DENY) { - return false; + return; } ByteBuff buffer; @@ -589,7 +675,7 @@ error(flowContext, new FlowError(CODE_MAX_ENCODER_ERRORS_REACHED, "Max encoder errors reached.")); } - return false; + return; } if (collectMetric) { @@ -603,17 +689,90 @@ flowContext.channelContext().writeAndFlush(buffer); requestSent0(flowContext, event); buffer.clear(); - return true; } catch (Exception e) { flowContext.sendErrors++; if (logger.isDebugEnabled()) { debug(flowContext, e.getMessage(), e); } } + + if (wait && flowContext.isBidirectional()) { + waitForResponse(flowContext); + } } + } catch (Exception e) { + error(flowContext, e); + } finally { + flowContext.unlock(); + } + } + + protected boolean waitForResponse(FlowContext flowContext) throws InterruptedException { + return waitForResponse(flowContext, timeouts.getDefaultTimeout()); + } + + protected boolean waitForResponse(FlowContext flowContext, long timeout) throws InterruptedException { + long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeout); + while (flowContext.sentEvent != null && flowContext.state == STATE_CONNECTED) { + if (timeNanos <= 0) { + return false; + } + + timeNanos = flowContext.lockCond.awaitNanos(timeNanos); } - return false; + return true; + } + + protected boolean waitOpFinished(FlowContext flowContext, byte neededFlags) throws InterruptedException { + return waitOpFinished(flowContext, neededFlags, timeouts.getDefaultTimeout()); + } + + protected boolean waitOpFinished(FlowContext flowContext, byte stateNeeded, long timeout) throws InterruptedException { + long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeout); + while ((flowContext.state & stateNeeded) != stateNeeded && !flowContext.isError()) { + if (timeNanos <= 0) { + return false; + } + + timeNanos = flowContext.lockCond.awaitNanos(timeNanos); + } + + return true; + } + + protected void processFlowSessionStatusEvent(SessionStatusEvent statusEvent, boolean wait) { + FlowContext flowContext = null; + try { + if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { + flowContext = registerAndConnect(statusEvent.getSessionInfo(), wait); + flowContext.sessionEstablishedSeen = true; + } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { + flowContext = flowContext(statusEvent); + if (flowContext != null) { + disconnect(flowContext, wait); + } + } + } catch (Exception e) { + errorInternal(flowContext, e); + } + } + + protected boolean mayReconnect(FlowContext flowContext) { + if (flowContext.state == STATE_CONNECTED || !flowContext.sessionEstablishedSeen) { + return false; + } + + return flowContext.error != null + && (flowContext.error.code() == CODE_CONNECTION_RESET_BY_PEER + || flowContext.error.code() == CODE_CONNECTION_CLOSED_UNEXPECTEDLY + || flowContext.error.code() == CODE_IDE_TIMEOUT); + } + + protected boolean checkMayConnectIfPartial(FlowContext flowContext) { + return connectPartialSession + && !flowContext.sessionEstablishedSeen + && !flowContext.isError(); } protected void processTimeouts() { @@ -629,7 +788,7 @@ if (logger.isDebugEnabled()) { debug(flowContext, "Flow for session '{}' timed out (state '{}').", flowContext.sessionInfo(), - contextStateToString(flowContext.state())); + stateToString(flowContext.state())); } switch (flowContext.state()) {
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Thu Apr 02 10:40:41 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Thu Apr 02 14:02:38 2020 +0200 @@ -95,11 +95,11 @@ closeAllConnections = false; } - @Override - protected boolean send(FlowContext flowContext, SessionPayloadEvent event) { +/* @Override + protected boolean send(FlowContext flowContext, SessionPayloadEvent event, boolean wait) { //Sprawdzamy, czy polaczen nie jest za duzo. Jezeli jest, to zamykamy //najmniej uzywane. - if (flowIndex.size() > maxSentRequests) { + if (flowIndex.size() > maxSentRequests) { int diff = flowIndex.size() - maxSentRequests; if (logger.isDebugEnabled()) { debug(flowContext, "Too many connections {}.", flowIndex.size()); @@ -119,7 +119,9 @@ } return super.send(flowContext, event); - } + + + }*/ private boolean canSend(FlowContext flowContext) { return flowContext.state() == FlowContext.STATE_CONNECTED && !flowContext.isEventSent(); @@ -158,7 +160,7 @@ } else if (event.getType() == SessionPayloadEvent.TYPE && canSend(flowContext)) { localFlowContext.eventsQueue.poll(); - send(flowContext, (SessionPayloadEvent) event); + send(flowContext, (SessionPayloadEvent) event, true); } else { localFlowContext.eventsQueue.poll(); } @@ -230,7 +232,7 @@ && (flowContext.state() == FlowContext.STATE_CONNECTED || flowContext.state() == FlowContext.STATE_ERROR || flowContext.isEventSent())) { - send(flowContext, payloadEvent); + send(flowContext, payloadEvent, true); } else { addToQueue(flowContext, event); }
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Thu Apr 02 10:40:41 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Thu Apr 02 14:02:38 2020 +0200 @@ -4,28 +4,24 @@ import com.passus.commons.annotations.Plugin; import com.passus.st.emitter.Emitter; import com.passus.st.plugin.PluginConstants; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; -import java.io.IOException; import java.util.concurrent.LinkedBlockingDeque; -import static com.passus.st.client.FlowContext.contextStateToString; +import static com.passus.st.client.FlowContext.STATE_CONNECTED; @Plugin(name = SynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER) public class SynchFlowWorker extends FlowWorkerBase { + protected final Logger logger = LogManager.getLogger(getClass()); + public static final String TYPE = "synch"; private final LinkedBlockingDeque<Event> eventsQueue = new LinkedBlockingDeque<>(); private long eventsQueueWaitTime = 100; - /** - * Context dla ktorego wykonywana jest operacja. - */ - private FlowContext currFlowContext; - - private boolean loopEnd = false; - private int loop = 0; public SynchFlowWorker(Emitter emitter, String name, int index) { @@ -47,62 +43,12 @@ } @Override - protected void flowStateChanged(FlowContext context, int oldState) { - if (logger.isDebugEnabled()) { - logger.debug("flowStateChanged {},{}", context == currFlowContext, contextStateToString(context.state())); - } - - if (context == currFlowContext) { - if (context.state() == FlowContext.STATE_CONNECTED - || context.state() == FlowContext.STATE_ERROR - || context.state() == FlowContext.STATE_DISCONNECTED) { - currFlowContext = null; - } - } - } - - @Override public void handle(Event event) { Event newEvent = eventInstanceForWorker(event); - synchronized (lock) { - try { - eventsQueue.put(newEvent); - } catch (Exception e) { - logger.debug("Unable to add event to queue. " + e.getMessage(), e); - } - - lock.notifyAll(); - } - } - - @Override - protected void closeAllConnections() { - synchronized (lock) { - boolean wait; - do { - wait = false; - for (FlowContext flowContext : sessions.values()) { - if (flowContext.isEventSent()) { - wait = true; - break; - } - } - - if (wait) { - try { - lock.wait(100); - } catch (Exception e) { - } - } - } while (wait); - - super.closeAllConnections(); - while (!sessions.isEmpty()) { - try { - lock.wait(100); - } catch (Exception e) { - } - } + try { + eventsQueue.put(newEvent); + } catch (Exception e) { + logger.debug("Unable to add event to queue. ", e); } } @@ -111,164 +57,84 @@ * * @return boolean */ - private boolean pollNext() { - Event event = eventsQueue.poll(); - if (event != null) { - sleep(event); - if (logger.isTraceEnabled()) { - logger.trace("Event processing: {}", event); + private void process(Event event) { + sleep(event); + if (logger.isTraceEnabled()) { + logger.trace("Event processing: {}", event); + } + + if (event instanceof SessionEvent) { + SessionEvent sessEvent = (SessionEvent) event; + if (isBlockedSession(sessEvent.getSessionInfo())) { + return; } - if (event instanceof SessionEvent) { - SessionEvent sessEvent = (SessionEvent) event; - if (isBlockedSession(sessEvent.getSessionInfo())) { - return true; - } + if (event.getType() == SessionStatusEvent.TYPE) { + SessionStatusEvent statusEvent = (SessionStatusEvent) sessEvent; + processFlowSessionStatusEvent(statusEvent, true); + return; + } else if (event.getType() == SessionPayloadEvent.TYPE) { + FlowContext flowContext = flowContext(sessEvent); - if (event.getType() == SessionStatusEvent.TYPE) { - SessionStatusEvent statusEvent = (SessionStatusEvent) sessEvent; - if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { - try { - currFlowContext = register(statusEvent); - if (currFlowContext != null) { - currFlowContext.loop(loop); - emitter.connect(statusEvent.getSessionInfo(), this, index); - } - } catch (Exception e) { - logger.error(e.getMessage(), e); + if (flowContext != null) { + if (flowContext.blocked) { + return; + } else if (mayReconnect(flowContext)) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Reconnecting."); } - return (currFlowContext == null); - } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { - currFlowContext = flowContext((SessionEvent) event); - if (currFlowContext != null) { - if (!currFlowContext.isEventSent()) { - disconnect(statusEvent); - } - } - } - - return true; - } else if (event.getType() == SessionPayloadEvent.TYPE) { - FlowContext flowContext = flowContext(sessEvent); - if (flowContext.blocked) { - return true; + connect(flowContext, true); } - - if (flowContext != null) { - switch (flowContext.state()) { - case FlowContext.STATE_CONNECTED: - case FlowContext.STATE_ERROR: - currFlowContext = flowContext; - if (send(flowContext, (SessionPayloadEvent) event)) { - return false; - } else { - currFlowContext = null; - return true; - } - case FlowContext.STATE_DISCONNECTING: - case FlowContext.STATE_DISCONNECTED: - if (connectPartialSession) { - currFlowContext = register(sessEvent); - if (currFlowContext != null) { - try { - currFlowContext.loop(loop); - emitter.connect(sessEvent.getSessionInfo(), this, index); - } catch (IOException e) { - logger.error(e.getMessage(), e); - currFlowContext = null; - } - } - - return false; - } else { - return true; - } - default: - return false; - } - } else if (connectPartialSession) { - currFlowContext = register(sessEvent); - if (currFlowContext != null) { - try { - currFlowContext.loop(loop); - emitter.connect(sessEvent.getSessionInfo(), this, index); - eventsQueue.addFirst(sessEvent); - } catch (IOException e) { - logger.error(e.getMessage(), e); - currFlowContext = null; - } - - return false; - } else { - return true; - } - } - - return true; - } else { - return true; - } - } else if (event.getType() == DataEvents.DataLoopEnd.TYPE) { - if (logger.isDebugEnabled()) { - logger.debug("DataLoopEnd received."); + } else if (connectPartialSession) { + flowContext = registerAndConnect(sessEvent.getSessionInfo(), true); } - loopEnd = true; - closeAllConnections(); - filterChain.reset(); - if (currFlowContext != null) { - loop = currFlowContext.loop() + 1; + if (flowContext.state == STATE_CONNECTED) { + send(flowContext, (SessionPayloadEvent) event, true); } + } + } else if (event.getType() == DataEvents.DataLoopEnd.TYPE) { + if (logger.isDebugEnabled()) { + logger.debug("DataLoopEnd received."); + } - loopEnd = false; - return true; - } else if (event.getType() == DataEvents.DataEnd.TYPE) { - if (logger.isDebugEnabled()) { - logger.debug("DataEnd received. Deactivation."); - } + closeAllConnections(); + filterChain.reset(); + } else if (event.getType() == DataEvents.DataEnd.TYPE) { + if (logger.isDebugEnabled()) { + logger.debug("DataEnd received. Deactivation."); + } - working = false; - } + working = false; } - - return false; } @Override public void disconnect() { - synchronized (lock) { - eventsQueue.clear(); - super.disconnect(); - lock.notifyAll(); - } + super.disconnect(); + eventsQueue.clear(); } @Override public void run() { - synchronized (lock) { - working = true; - while (working) { - try { - try { - lock.wait(eventsQueueWaitTime); - } catch (InterruptedException ignore) { - } + working = true; + while (working) { + Event event = null; + try { + event = eventsQueue.take(); + } catch (InterruptedException ignore) { + } - boolean nextPoll; - do { - if (loopEnd || !working) { - break; - } - - nextPoll = pollNext(); - } while (nextPoll); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } + try { + process(event); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); } } } } + + }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/Timeouts.java Thu Apr 02 14:02:38 2020 +0200 @@ -0,0 +1,45 @@ +package com.passus.st.client; + +public class Timeouts { + + public static final long DEFAULT_TIMEOUT = 10_000L; + public static final long DEFAULT_CONNECTING_TIMEOUT = 10_000L; + public static final long DEFAULT_DISCONNECTING = 10_000L; + + private long defaultTimeout = DEFAULT_TIMEOUT; + private long connectingTimeout = DEFAULT_CONNECTING_TIMEOUT; + private long disconnectingTimeout = DEFAULT_DISCONNECTING; + + public Timeouts() { + } + + public Timeouts(Timeouts timeouts) { + this.defaultTimeout = defaultTimeout; + this.connectingTimeout = connectingTimeout; + this.disconnectingTimeout = disconnectingTimeout; + } + + public long getDefaultTimeout() { + return defaultTimeout; + } + + public void setDefaultTimeout(long defaultTimeout) { + this.defaultTimeout = defaultTimeout; + } + + public long getConnectingTimeout() { + return connectingTimeout; + } + + public void setConnectingTimeout(long connectingTimeout) { + this.connectingTimeout = connectingTimeout; + } + + public long getDisconnectingTimeout() { + return disconnectingTimeout; + } + + public void setDisconnectingTimeout(long disconnectingTimeout) { + this.disconnectingTimeout = disconnectingTimeout; + } +}
--- a/stress-tester/src/test/java/com/passus/st/client/FlowExecutorTest.java Thu Apr 02 10:40:41 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/FlowExecutorTest.java Thu Apr 02 14:02:38 2020 +0200 @@ -2,6 +2,7 @@ import com.passus.commons.service.ServiceUtils; import com.passus.st.AbstractWireMockTest; +import com.passus.st.Log4jConfigurationFactory; import com.passus.st.emitter.RuleBasedSessionMapper; import com.passus.st.emitter.nio.NioEmitter; import com.passus.st.utils.EventUtils; @@ -19,7 +20,6 @@ public class FlowExecutorTest extends AbstractWireMockTest { - private NioEmitter prepareEmitter(String mapperRule) throws Exception { RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper(); sessionMapper.addRule(mapperRule); @@ -61,7 +61,7 @@ flowExecutor.handle(new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED)); events.forEach(flowExecutor::handle); - flowExecutor.join(); + flowExecutor.join(2_000); assertTrue(listener.size() > 0); assertTrue(listener.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent); TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listener.get(0); @@ -94,7 +94,7 @@ events.forEach(flowExecutor::handle); - flowExecutor.join(); + flowExecutor.join(2_000); assertTrue(listener.size() > 0); assertTrue(listener.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent);
--- a/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java Thu Apr 02 10:40:41 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java Thu Apr 02 14:02:38 2020 +0200 @@ -126,13 +126,20 @@ @Test public void testDisconnect() throws Exception { TestFlowWorker worker = worker(); - makeConnected(worker, session); + ChannelContext channelContext = makeConnected(worker, session); + FlowContext flowContext = worker.flowContext(session); + when(channelContext.getAttachment()).thenReturn(flowContext); - FlowContext flowContext = worker.flowContext(session); + doAnswer((Answer<Void>) invocation -> { + worker.channelInactive(channelContext); + return null; + } + ).when(channelContext).close(); worker.disconnect(session); assertEquals(FlowContext.STATE_DISCONNECTED, flowContext.state); assertEquals(null, flowContext.buffer); + assertNull(flowContext.error); assertNull(worker.flowContext(session)); }
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Thu Apr 02 10:40:41 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Thu Apr 02 14:02:38 2020 +0200 @@ -27,7 +27,6 @@ public static final long JOIN_TIMEOUT = 1_000; - private final TestHttpClientListener listener = new TestHttpClientListener(); private static class LocalEmitter implements Emitter { @@ -262,6 +261,11 @@ } @Test + public void testConnectSuccess() { + + } + + @Test public void testHandle_HTTP_SimpleRequestResponse() throws Exception { List<Event> events = readDefaultEvents(); SynchFlowWorker worker = createWorker();
--- a/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java Thu Apr 02 10:40:41 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java Thu Apr 02 14:02:38 2020 +0200 @@ -4,6 +4,7 @@ import com.passus.commons.utils.ArrayUtils; import com.passus.net.netflow.Netflow9; import com.passus.net.netflow.Netflow9Decoder; +import com.passus.st.Log4jConfigurationFactory; import com.passus.st.Protocols; import com.passus.st.client.Event; import com.passus.st.client.FlowExecutor;