Mercurial > stress-tester
changeset 973:641a1a8bcb12
PcapSessionEventSource - TCP/UDP session processing improvements + bugfixes
author | Devel 2 |
---|---|
date | Wed, 24 Jul 2019 14:19:28 +0200 |
parents | 6fc989064ecf |
children | e3532e4a84fe |
files | stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java stress-tester/src/main/java/com/passus/st/source/BaseSessionAnalyzerListener.java stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSourceMetric.java stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java |
diffstat | 9 files changed, 635 insertions(+), 728 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java Tue Jul 23 10:44:29 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java Wed Jul 24 14:19:28 2019 +0200 @@ -15,7 +15,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; @Plugin(name = AsynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER) -public class AsynchFlowWorker extends FlowWorkerBased { +public class AsynchFlowWorker extends FlowWorkerBase { public static final String TYPE = "asynch";
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Wed Jul 24 14:19:28 2019 +0200 @@ -0,0 +1,566 @@ +package com.passus.st.client; + +import com.passus.commons.Assert; +import com.passus.data.ByteBuff; +import com.passus.data.DataDecoder; +import com.passus.data.HeapByteBuff; +import com.passus.filter.Filter; +import com.passus.st.emitter.ChannelContext; +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.metric.MetricsContainer; +import it.unimi.dsi.fastutil.ints.Int2LongArrayMap; +import it.unimi.dsi.fastutil.ints.Int2LongMap; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static com.passus.st.client.FlowContext.*; +import static com.passus.st.client.FlowUtils.DEFAULT_TIMEOUTS; + +public abstract class FlowWorkerBase extends FlowWorker { + + public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f; + + protected final Map<SessionInfo, FlowContext> sessions = new ConcurrentHashMap<>(); + + private final Set<SessionInfo> blockedSessions = new HashSet<>(); + + private final Int2LongMap timeouts = new Int2LongArrayMap(); + + protected final Object lock = new Object(); + + protected volatile boolean working = false; + + private long checkTimeoutsPeriod = 5_000; + + private long nextCheckTimeoutsTime = -1; + + private float sleepFactor = SLEEP_FACTOR_NO_SLEEP; + + private long lastEventTimestamp = -1; + + public FlowWorkerBase(Emitter emitter, String name, int index) { + super(emitter, name, index); + timeouts.putAll(DEFAULT_TIMEOUTS); + } + + @Override + public boolean isWorking() { + return working; + } + + @Override + public int activeConnections() { + int count = 0; + synchronized (lock) { + for (FlowContext flowContext : sessions.values()) { + if (flowContext.state() != STATE_DISCONNECTED) { + count++; + } + } + } + + return count; + } + + protected final void addBlockedSession(SessionInfo session) { + blockedSessions.add(session); + } + + protected final boolean isBlockedSession(SessionInfo session) { + return !blockedSessions.isEmpty() && blockedSessions.contains(session); + } + + public float getSleepFactor() { + return sleepFactor; + } + + public void setSleepFactor(float sleepFactor) { + Assert.greaterOrEqualZero(sleepFactor, "sleepFactor"); + this.sleepFactor = sleepFactor; + } + + public long getCheckTimeoutsPeriod() { + return checkTimeoutsPeriod; + } + + public void setCheckTimeoutsPeriod(long checkTimeoutsPeriod) { + Assert.greaterThanZero(checkTimeoutsPeriod, "checkTimeoutsPeriod"); + this.checkTimeoutsPeriod = checkTimeoutsPeriod; + } + + protected final void changeFlowState(FlowContext flowContext, int state) { + try { + if (flowContext.state() == state) { + return; + } + + int oldState = flowContext.state(); + if (logger.isDebugEnabled()) { + debug(flowContext, "Flow status changing {} -> {}.", + contextStateToString(flowContext.state()), + contextStateToString(state) + ); + } + + switch (state) { + case FlowContext.STATE_CONNECTING: + flowContext.clear(); + break; + case FlowContext.STATE_CONNECTED: + flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY)); + break; + case FlowContext.STATE_ERROR: + changeFlowState(flowContext, STATE_DISCONNECTED); + break; + case FlowContext.STATE_RESP_RECEIVED: + flowContext.sentEvent(null); + flowContext.receivedStartTimestamp(-1); + break; + case FlowContext.STATE_DISCONNECTING: + if (flowContext.state() < FlowContext.STATE_DISCONNECTING) { + if (flowContext.channelContext() != null) { + try { + flowContext.channelContext().close(); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } + } else { + changeFlowState(flowContext, STATE_DISCONNECTED); + } + } else { + return; + } + break; + case STATE_DISCONNECTED: + flowContext.state(STATE_DISCONNECTED); + flowContext.clear(); + removeFlowContext(flowContext); + flowStateChanged(flowContext, oldState); + return; + } + + long timeout = timeouts.get(flowContext.state()); + flowContext.timeout(timeGenerator.currentTimeMillis() + timeout); + flowContext.state(state); + flowStateChanged(flowContext, oldState); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + } + + protected void flowStateChanged(FlowContext context, int oldState) { + + } + + protected FlowContext flowContext(SessionEvent event) { + return flowContext(event.getSessionInfo()); + } + + protected FlowContext flowContext(ChannelContext context) { + return flowContext(context.getSessionInfo()); + } + + protected FlowContext flowContext(SessionInfo session) { + FlowContext context = sessions.get(session); + if (context == null) { + if (logger.isDebugEnabled()) { + logger.debug("Context for session '" + session + "' not found."); + } + } + + return context; + } + + protected FlowContext createFlowContext(SessionInfo session) { + return new FlowContext(session); + } + + protected FlowContext register(SessionEvent sessionEvent) { + return register(sessionEvent.getSessionInfo()); + } + + protected FlowContext register(SessionInfo session) { + synchronized (lock) { + if (sessions.containsKey(session)) { + logger.warn("Unable to register session '" + session + "'. Session already registered."); + return null; + } + + FlowContext flowContext = createFlowContext(session); + //TODO Malo optymalne + FlowHandler client = clientFactory.create(session.getProtocolId()); + client.init(flowContext); + flowContext.client(client); + sessions.put(session, flowContext); + return flowContext; + } + } + + protected FlowContext connect(SessionEvent sessionEvent) { + return connect(sessionEvent.getSessionInfo()); + } + + protected FlowContext connect(SessionInfo session) { + synchronized (lock) { + try { + FlowContext flowContext = register(session); + if (flowContext != null) { + emitter.connect(session, this, index); + return flowContext; + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + return null; + } + } + + @Override + public void close() { + synchronized (lock) { + for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) { + FlowContext flowContext = entry.getValue(); + try { + closeSession(flowContext); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + debug(flowContext, e.getMessage(), e); + } + } + } + + sessions.clear(); + working = false; + } + } + + protected void close(SessionEvent sessionEvent) { + close(sessionEvent.getSessionInfo()); + } + + protected void close(FlowContext flowContext) { + synchronized (lock) { + try { + closeSession(flowContext); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } + } + } + + @Override + public void close(SessionInfo session) { + synchronized (lock) { + try { + FlowContext flowContext = flowContext(session); + closeSession(flowContext); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } + } + } + + protected void closeSession(FlowContext flowContext) { + synchronized (lock) { + if (flowContext != null) { + changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING); + } + } + } + + protected void removeFlowContext(FlowContext flowContext) { + synchronized (lock) { + debug(flowContext, "removeFlowContext"); + sessions.remove(flowContext.sessionInfo()); + } + } + + protected void reconnect(FlowContext flowContext) { + synchronized (lock) { + try { + if (logger.isDebugEnabled()) { + debug(flowContext, "Reconnect (state: {}).", contextStateToString(flowContext.state())); + } + + SessionInfo session = flowContext.sessionInfo(); + changeFlowState(flowContext, FlowContext.STATE_CONNECTING); + emitter.connect(session, this, index); + } catch (Exception e) { + error(flowContext, e.getMessage(), e); + } + } + } + + protected void closeAllConnections() { + synchronized (lock) { + for (FlowContext flowContext : sessions.values()) { + closeSession(flowContext); + } + } + } + + private void sleepSilently(long millis) { + if (millis == 0) { + return; + } + if (millis < 0) { + logger.warn("Cannot sleep for negative interval: {}."); + return; + } + logger.debug("Going sleep for: {}.", millis); + try { + Thread.sleep(millis); + } catch (InterruptedException ignore) { + } + } + + protected void sleep(Event event) { + if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) { + if (lastEventTimestamp != -1) { + long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor); + sleepSilently(timeToSleep); + } + lastEventTimestamp = event.getTimestamp(); + } + } + + @Override + public void writeMetrics(MetricsContainer container) { + synchronized (lock) { + super.writeMetrics(container); + } + } + + @Override + public void channelActive(ChannelContext context) throws Exception { + synchronized (lock) { + FlowContext flowContext = flowContext(context); + if (flowContext != null) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})", + context.getLocalAddress(), + context.getRemoteAddress()); + } + + context.setBidirectional(flowContext.isBidirectional()); + flowContext.channelContext(context); + changeFlowState(flowContext, STATE_CONNECTED); + } + + lock.notifyAll(); + } + } + + @Override + public void channelInactive(ChannelContext context) throws Exception { + synchronized (lock) { + FlowContext flowContext = flowContext(context); + if (flowContext != null) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Channel inactive."); + } + + changeFlowState(flowContext, STATE_DISCONNECTED); + } + lock.notifyAll(); + } + } + + @Override + public void dataReceived(ChannelContext context, ByteBuff data) throws Exception { + synchronized (lock) { + FlowContext flowContext = flowContext(context); + try { + if (flowContext != null) { + FlowHandler client = flowContext.client(); + FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext); + decoder.decode(data, flowContext); + long now = timeGenerator.currentTimeMillis(); + if (flowContext.receivedStartTimestamp() == -1) { + flowContext.receivedStartTimestamp(now); + } + + if (decoder.state() == DataDecoder.STATE_ERROR) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Decoder error. " + decoder.getLastError()); + } + + decoder.clear(flowContext); + changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED); + } else if (decoder.state() == DataDecoder.STATE_FINISHED) { + Object resp = decoder.getResult(); + Object req = null; + if (flowContext.sentEvent() != null) { + req = flowContext.sentEvent().getRequest(); + } + + if (filterChain.filterInbound(req, resp, flowContext) != Filter.DENY) { + try { + fireResponseReceived(req, resp, flowContext); + } catch (Exception e) { + error(flowContext, e.getMessage(), e); + } + } + + decoder.clear(flowContext); + changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED); + } + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + debug(flowContext, e.getMessage(), e); + } + } + + lock.notifyAll(); + } + } + + @Override + public void dataWriteStart(ChannelContext context) { + synchronized (lock) { + FlowContext flowContext = flowContext(context); + if (flowContext != null && flowContext.sentEvent() != null) { + long now = timeGenerator.currentTimeMillis(); + flowContext.sendStartTimestamp(now); + flowContext.client().onDataWriteStart(flowContext); + } + } + } + + @Override + public void dataWritten(ChannelContext context) throws Exception { + synchronized (lock) { + FlowContext flowContext = flowContext(context); + if (flowContext != null && flowContext.sentEvent() != null) { + long now = timeGenerator.currentTimeMillis(); + if (collectMetric) { + synchronized (metric) { + metric.addRequestSendingTime(now - flowContext.sendStartTimestamp()); + } + } + + flowContext.client().onDataWriteEnd(flowContext); + } + + lock.notifyAll(); + } + } + + @Override + public void errorOccurred(ChannelContext context, Throwable cause) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("Error occurred. " + cause.getMessage(), cause); + } + + synchronized (lock) { + FlowContext flowContext = flowContext(context); + if (flowContext != null) { + changeFlowState(flowContext, FlowContext.STATE_ERROR); + } + + lock.notifyAll(); + } + } + + protected boolean send(FlowContext flowContext, SessionPayloadEvent event) { + synchronized (lock) { + Object req = event.getRequest(); + if (req != null) { + if (filterChain.filterOutbound(req, event.getResponse(), flowContext) == Filter.DENY) { + return false; + } + + FlowHandler client = flowContext.client(); + FlowHandlerDataEncoder encoder = client.getRequestEncoder(flowContext); + ByteBuff buffer = flowContext.buffer(); + encoder.encode(req, flowContext, buffer); + + if (collectMetric) { + synchronized (metric) { + metric.incRequestsNum(); + metric.addRequestSize(flowContext.buffer().readableBytes()); + } + } + + try { + changeFlowState(flowContext, FlowContext.STATE_REQ_SENT); + flowContext.sentEvent(event); + flowContext.channelContext().writeAndFlush(buffer); + buffer.clear(); + return true; + } catch (Exception e) { + if (logger.isDebugEnabled()) { + debug(flowContext, e.getMessage(), e); + } + } + } + } + + return false; + } + + protected void processTimeouts() { + synchronized (lock) { + try { + long now = timeGenerator.currentTimeMillis(); + if (nextCheckTimeoutsTime == -1) { + nextCheckTimeoutsTime = now + checkTimeoutsPeriod; + } else if (nextCheckTimeoutsTime > now) { + nextCheckTimeoutsTime = now + checkTimeoutsPeriod; + for (FlowContext flowContext : sessions.values()) { + if (flowContext.timeouted()) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Flow for session '{}' timed out (state '{}').", + flowContext.sessionInfo(), + contextStateToString(flowContext.state())); + } + + switch (flowContext.state()) { + case FlowContext.STATE_CONNECTING: + case FlowContext.STATE_CONNECTED: + case FlowContext.STATE_REQ_SENT: + case FlowContext.STATE_ERROR: + closeSession(flowContext); + break; + case FlowContext.STATE_RESP_RECEIVED: + //Dziwny blad nie powinien wystepowac + break; + case FlowContext.STATE_DISCONNECTING: + case STATE_DISCONNECTED: + removeFlowContext(flowContext); + break; + } + } + } + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); + } + } + } + } + + protected Event eventInstanceForWorker(Event event) { + if (event instanceof SessionEvent) { + Event newEvent = ((SessionEvent) event).instanceForWorker(index); + newEvent.setTimestamp(event.getTimestamp()); + return newEvent; + } else { + return event; + } + } + + +}
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java Tue Jul 23 10:44:29 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,569 +0,0 @@ -package com.passus.st.client; - -import com.passus.commons.Assert; -import com.passus.data.ByteBuff; -import com.passus.data.DataDecoder; -import com.passus.data.HeapByteBuff; -import com.passus.filter.Filter; -import com.passus.st.emitter.ChannelContext; -import com.passus.st.emitter.Emitter; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.metric.MetricsContainer; -import it.unimi.dsi.fastutil.ints.Int2LongArrayMap; -import it.unimi.dsi.fastutil.ints.Int2LongMap; - -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import static com.passus.st.client.FlowContext.*; -import static com.passus.st.client.FlowUtils.DEFAULT_TIMEOUTS; - -public abstract class FlowWorkerBased extends FlowWorker { - - public static final float SLEEP_FACTOR_NO_SLEEP = 0.0f; - - protected final Map<SessionInfo, FlowContext> sessions = new ConcurrentHashMap<>(); - - private final Set<SessionInfo> blockedSessions = new HashSet<>(); - - private final Int2LongMap timeouts = new Int2LongArrayMap(); - - protected final Object lock = new Object(); - - protected volatile boolean working = false; - - private long checkTimeoutsPeriod = 5_000; - - private long nextCheckTimeoutsTime = -1; - - private float sleepFactor = SLEEP_FACTOR_NO_SLEEP; - - private long lastEventTimestamp = -1; - - public FlowWorkerBased(Emitter emitter, String name, int index) { - super(emitter, name, index); - timeouts.putAll(DEFAULT_TIMEOUTS); - } - - @Override - public boolean isWorking() { - return working; - } - - @Override - public int activeConnections() { - int count = 0; - synchronized (lock) { - for (FlowContext flowContext : sessions.values()) { - if (flowContext.state() != STATE_DISCONNECTED) { - count++; - } - } - } - - return count; - } - - protected final void addBlockedSession(SessionInfo session) { - blockedSessions.add(session); - } - - protected final boolean isBlockedSession(SessionInfo session) { - return !blockedSessions.isEmpty() && blockedSessions.contains(session); - } - - public float getSleepFactor() { - return sleepFactor; - } - - public void setSleepFactor(float sleepFactor) { - Assert.greaterOrEqualZero(sleepFactor, "sleepFactor"); - this.sleepFactor = sleepFactor; - } - - public long getCheckTimeoutsPeriod() { - return checkTimeoutsPeriod; - } - - public void setCheckTimeoutsPeriod(long checkTimeoutsPeriod) { - Assert.greaterThanZero(checkTimeoutsPeriod, "checkTimeoutsPeriod"); - this.checkTimeoutsPeriod = checkTimeoutsPeriod; - } - - protected final void changeFlowState(FlowContext flowContext, int state) { - try { - if (flowContext.state() == state) { - return; - } - - int oldState = flowContext.state(); - if (logger.isDebugEnabled()) { - debug(flowContext, "Flow status changing {} -> {}.", - contextStateToString(flowContext.state()), - contextStateToString(state) - ); - } - - switch (state) { - case FlowContext.STATE_CONNECTING: - flowContext.clear(); - break; - case FlowContext.STATE_CONNECTED: - flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY)); - break; - case FlowContext.STATE_ERROR: - changeFlowState(flowContext, STATE_DISCONNECTED); - break; - case FlowContext.STATE_RESP_RECEIVED: - flowContext.sentEvent(null); - flowContext.receivedStartTimestamp(-1); - break; - case FlowContext.STATE_DISCONNECTING: - if (flowContext.state() < FlowContext.STATE_DISCONNECTING) { - if (flowContext.channelContext() != null) { - try { - flowContext.channelContext().close(); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } - } else { - changeFlowState(flowContext, STATE_DISCONNECTED); - } - } else { - return; - } - break; - case STATE_DISCONNECTED: - flowContext.state(STATE_DISCONNECTED); - flowContext.clear(); - removeFlowContext(flowContext); - flowStateChanged(flowContext, oldState); - return; - } - - long timeout = timeouts.get(flowContext.state()); - flowContext.timeout(timeGenerator.currentTimeMillis() + timeout); - flowContext.state(state); - flowStateChanged(flowContext, oldState); - } catch (Exception e) { - logger.debug(e.getMessage(), e); - } - } - - protected void flowStateChanged(FlowContext context, int oldState) { - - } - - protected FlowContext flowContext(SessionEvent event) { - return flowContext(event.getSessionInfo()); - } - - protected FlowContext flowContext(ChannelContext context) { - return flowContext(context.getSessionInfo()); - } - - protected FlowContext flowContext(SessionInfo session) { - FlowContext context = sessions.get(session); - if (context == null) { - if (logger.isDebugEnabled()) { - logger.debug("Context for session '" + session + "' not found."); - } - } - - return context; - } - - protected FlowContext createFlowContext(SessionInfo session) { - return new FlowContext(session); - } - - protected FlowContext register(SessionEvent sessionEvent) { - return register(sessionEvent.getSessionInfo()); - } - - protected FlowContext register(SessionInfo session) { - synchronized (lock) { - if (sessions.containsKey(session)) { - logger.warn("Unable to register session '" + session + "'. Session already registered."); - return null; - } - - FlowContext flowContext = createFlowContext(session); - //TODO Malo optymalne - FlowHandler client = clientFactory.create(session.getProtocolId()); - client.init(flowContext); - flowContext.client(client); - sessions.put(session, flowContext); - return flowContext; - } - } - - protected FlowContext connect(SessionEvent sessionEvent) { - return connect(sessionEvent.getSessionInfo()); - } - - protected FlowContext connect(SessionInfo session) { - synchronized (lock) { - try { - FlowContext flowContext = register(session); - if (flowContext != null) { - emitter.connect(session, this, index); - return flowContext; - } - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - return null; - } - } - - @Override - public void close() { - synchronized (lock) { - for (Map.Entry<SessionInfo, FlowContext> entry : sessions.entrySet()) { - FlowContext flowContext = entry.getValue(); - try { - closeSession(flowContext); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - debug(flowContext, e.getMessage(), e); - } - } - } - - sessions.clear(); - working = false; - } - } - - protected void close(SessionEvent sessionEvent) { - close(sessionEvent.getSessionInfo()); - } - - protected void close(FlowContext flowContext) { - synchronized (lock) { - try { - closeSession(flowContext); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } - } - } - - @Override - public void close(SessionInfo session) { - synchronized (lock) { - try { - FlowContext flowContext = flowContext(session); - closeSession(flowContext); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } - } - } - - protected void closeSession(FlowContext flowContext) { - synchronized (lock) { - if (flowContext != null) { - changeFlowState(flowContext, FlowContext.STATE_DISCONNECTING); - } - } - } - - protected void removeFlowContext(FlowContext flowContext) { - synchronized (lock) { - debug(flowContext, "removeFlowContext"); - sessions.remove(flowContext.sessionInfo()); - } - } - - protected void reconnect(FlowContext flowContext) { - synchronized (lock) { - try { - if (logger.isDebugEnabled()) { - debug(flowContext, "Reconnect (state: {}).", contextStateToString(flowContext.state())); - } - - SessionInfo session = flowContext.sessionInfo(); - changeFlowState(flowContext, FlowContext.STATE_CONNECTING); - emitter.connect(session, this, index); - } catch (Exception e) { - error(flowContext, e.getMessage(), e); - } - } - } - - protected void closeAllConnections() { - synchronized (lock) { - for (FlowContext flowContext : sessions.values()) { - closeSession(flowContext); - } - } - } - - private void sleepSilently(long millis) { - if (millis == 0) { - return; - } - if (millis < 0) { - logger.warn("Cannot sleep for negative interval: {}."); - return; - } - logger.debug("Going sleep for: {}.", millis); - try { - Thread.sleep(millis); - } catch (InterruptedException ignore) { - } - } - - protected void sleep(Event event) { - if (sleepFactor != SLEEP_FACTOR_NO_SLEEP) { - if (lastEventTimestamp != -1) { - long timeToSleep = (long) ((event.getTimestamp() - lastEventTimestamp) * sleepFactor); - sleepSilently(timeToSleep); - } - lastEventTimestamp = event.getTimestamp(); - } - } - - @Override - public void writeMetrics(MetricsContainer container) { - synchronized (lock) { - super.writeMetrics(container); - } - } - - @Override - public void channelActive(ChannelContext context) throws Exception { - synchronized (lock) { - FlowContext flowContext = flowContext(context); - if (flowContext != null) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Channel active (localSocket: {}, remoteSocket: {})", - context.getLocalAddress(), - context.getRemoteAddress()); - } - - context.setBidirectional(flowContext.isBidirectional()); - flowContext.channelContext(context); - changeFlowState(flowContext, STATE_CONNECTED); - } - - lock.notifyAll(); - } - } - - @Override - public void channelInactive(ChannelContext context) throws Exception { - synchronized (lock) { - FlowContext flowContext = flowContext(context); - if (flowContext != null) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Channel inactive."); - } - - changeFlowState(flowContext, STATE_DISCONNECTED); - } - lock.notifyAll(); - } - } - - @Override - public void dataReceived(ChannelContext context, ByteBuff data) throws Exception { - synchronized (lock) { - FlowContext flowContext = flowContext(context); - try { - if (flowContext != null) { - FlowHandler client = flowContext.client(); - FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext); - decoder.decode(data, flowContext); - long now = timeGenerator.currentTimeMillis(); - if (flowContext.receivedStartTimestamp() == -1) { - flowContext.receivedStartTimestamp(now); - } - - if (decoder.state() == DataDecoder.STATE_ERROR) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Decoder error. " + decoder.getLastError()); - } - - decoder.clear(flowContext); - changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED); - } else if (decoder.state() == DataDecoder.STATE_FINISHED) { - Object resp = decoder.getResult(); - Object req = null; - if (flowContext.sentEvent() != null) { - req = flowContext.sentEvent().getRequest(); - } - - if (filterChain.filterInbound(req, resp, flowContext) != Filter.DENY) { - try { - fireResponseReceived(req, resp, flowContext); - } catch (Exception e) { - error(flowContext, e.getMessage(), e); - } - } - - decoder.clear(flowContext); - changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED); - } - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - debug(flowContext, e.getMessage(), e); - } - } - - lock.notifyAll(); - } - } - - @Override - public void dataWriteStart(ChannelContext context) { - synchronized (lock) { - FlowContext flowContext = flowContext(context); - if (flowContext != null && flowContext.sentEvent() != null) { - long now = timeGenerator.currentTimeMillis(); - flowContext.sendStartTimestamp(now); - flowContext.client().onDataWriteStart(flowContext); - } - } - } - - @Override - public void dataWritten(ChannelContext context) throws Exception { - synchronized (lock) { - FlowContext flowContext = flowContext(context); - if (flowContext != null && flowContext.sentEvent() != null) { - long now = timeGenerator.currentTimeMillis(); - if (collectMetric) { - synchronized (metric) { - metric.addRequestSendingTime(now - flowContext.sendStartTimestamp()); - } - } - - flowContext.client().onDataWriteEnd(flowContext); - } - - lock.notifyAll(); - } - } - - @Override - public void errorOccurred(ChannelContext context, Throwable cause) throws Exception { - if (logger.isDebugEnabled()) { - logger.debug("Error occurred. " + cause.getMessage(), cause); - } - - synchronized (lock) { - FlowContext flowContext = flowContext(context); - if (flowContext != null) { - changeFlowState(flowContext, FlowContext.STATE_ERROR); - } - - lock.notifyAll(); - } - } - - protected boolean send(FlowContext flowContext, SessionPayloadEvent event) { - synchronized (lock) { - Object req = event.getRequest(); - if (req != null) { - if (filterChain.filterOutbound(req, event.getResponse(), flowContext) == Filter.DENY) { - return false; - } - - FlowHandler client = flowContext.client(); - FlowHandlerDataEncoder encoder = client.getRequestEncoder(flowContext); - ByteBuff buffer = flowContext.buffer(); - encoder.encode(req, flowContext, buffer); - - if (collectMetric) { - synchronized (metric) { - metric.incRequestsNum(); - metric.addRequestSize(flowContext.buffer().readableBytes()); - } - } - - try { - changeFlowState(flowContext, FlowContext.STATE_REQ_SENT); - flowContext.sentEvent(event); - flowContext.channelContext().writeAndFlush(buffer); - buffer.clear(); - - - - return true; - } catch (Exception e) { - if (logger.isDebugEnabled()) { - debug(flowContext, e.getMessage(), e); - } - } - } - } - - return false; - } - - protected void processTimeouts() { - synchronized (lock) { - try { - long now = timeGenerator.currentTimeMillis(); - if (nextCheckTimeoutsTime == -1) { - nextCheckTimeoutsTime = now + checkTimeoutsPeriod; - } else if (nextCheckTimeoutsTime > now) { - nextCheckTimeoutsTime = now + checkTimeoutsPeriod; - for (FlowContext flowContext : sessions.values()) { - if (flowContext.timeouted()) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Flow for session '{}' timed out (state '{}').", - flowContext.sessionInfo(), - contextStateToString(flowContext.state())); - } - - switch (flowContext.state()) { - case FlowContext.STATE_CONNECTING: - case FlowContext.STATE_CONNECTED: - case FlowContext.STATE_REQ_SENT: - case FlowContext.STATE_ERROR: - closeSession(flowContext); - break; - case FlowContext.STATE_RESP_RECEIVED: - //Dziwny blad nie powinien wystepowac - break; - case FlowContext.STATE_DISCONNECTING: - case STATE_DISCONNECTED: - removeFlowContext(flowContext); - break; - } - } - } - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } - } - } - - protected Event eventInstanceForWorker(Event event) { - if (event instanceof SessionEvent) { - Event newEvent = ((SessionEvent) event).instanceForWorker(index); - newEvent.setTimestamp(event.getTimestamp()); - return newEvent; - } else { - return event; - } - } - - -}
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Tue Jul 23 10:44:29 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Wed Jul 24 14:19:28 2019 +0200 @@ -12,7 +12,7 @@ import java.util.concurrent.Semaphore; @Plugin(name = ParallelFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER) -public class ParallelFlowWorker extends FlowWorkerBased { +public class ParallelFlowWorker extends FlowWorkerBase { public static final String TYPE = "parallel";
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Tue Jul 23 10:44:29 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Wed Jul 24 14:19:28 2019 +0200 @@ -12,7 +12,7 @@ import static com.passus.st.client.FlowContext.contextStateToString; @Plugin(name = SynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER) -public class SynchFlowWorker extends FlowWorkerBased { +public class SynchFlowWorker extends FlowWorkerBase { public static final String TYPE = "synch";
--- a/stress-tester/src/main/java/com/passus/st/source/BaseSessionAnalyzerListener.java Tue Jul 23 10:44:29 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/BaseSessionAnalyzerListener.java Wed Jul 24 14:19:28 2019 +0200 @@ -10,7 +10,6 @@ import static com.passus.net.session.TcpSessionProcessorConstants.STATUS_CLOSE; import static com.passus.net.session.TcpSessionProcessorConstants.STATUS_ESTABLISHED; -import static com.passus.st.Protocols.HTTP; public abstract class BaseSessionAnalyzerListener<T> implements SessionAnalyzerListener<T> { @@ -70,7 +69,7 @@ SessionInfo info = new SessionInfo( context.getSrcIpAddr(), context.getSrcPort(), context.getDstIpAddr(), context.getDstPort(), - HTTP, context.getProtocol(), context.getId()); + context.getProtocol(), protocolId, context.getId()); info.setSourceName(sourceName); Event event = new SessionStatusEvent(info, sessionInfoStatus, sourceName); event.setTimestamp(timestamp);
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java Tue Jul 23 10:44:29 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java Wed Jul 24 14:19:28 2019 +0200 @@ -22,10 +22,8 @@ import com.passus.net.session.TcpSessionProcessor; import com.passus.net.session.UdpSessionProcessor; import com.passus.net.source.pcap.FrameDecoderImpl; -import com.passus.st.client.AbstractEvent; import com.passus.st.client.DataEvents.DataEnd; import com.passus.st.client.DataEvents.DataLoopEnd; -import com.passus.st.client.Event; import com.passus.st.client.EventHandler; import com.passus.st.metric.MetricSource; import com.passus.st.metric.MetricsContainer; @@ -37,9 +35,6 @@ import java.io.IOException; import java.util.*; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; import static com.passus.config.schema.ConfigurationSchemaBuilder.*; @@ -70,12 +65,8 @@ private PcapThread pcapThread; - private SessionAnalyzerThread sessionAnalyzerThread; - private EventHandler handler; - private final BlockingQueue<Event> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY); - private boolean allowPartialSession = false; protected boolean collectMetric; @@ -235,8 +226,7 @@ } public boolean isWorking() { - return (pcapThread != null && pcapThread.working) - || (sessionAnalyzerThread != null && sessionAnalyzerThread.working); + return (pcapThread != null && pcapThread.working); } private static void sleepFor(long millis) { @@ -259,16 +249,13 @@ } try { - pcapThread = new PcapThread(pcapFile, loops); - sessionAnalyzerThread = new SessionAnalyzerThread(allowPartialSession); - + SessionPacketHandler sessionPacketHandler = new SessionPacketHandler(allowPartialSession); + pcapThread = new PcapThread(pcapFile, sessionPacketHandler, loops); pcapThread.start(); - sessionAnalyzerThread.start(); started = true; } catch (Exception e) { throw new ServiceException(e.getMessage(), e); } - } @Override @@ -285,29 +272,12 @@ } catch (Exception ignored) { } - sessionAnalyzerThread.deactivate(); - sessionAnalyzerThread.interrupt(); - - try { - sessionAnalyzerThread.join(500); - } catch (Exception ignored) { - } - pcapThread = null; - sessionAnalyzerThread = null; - queue.clear(); started = false; } - public void waitForEmptyQueue() throws InterruptedException { - while (!queue.isEmpty()) { - Thread.sleep(100); - } - } - public void join() throws InterruptedException { pcapThread.join(); - waitForEmptyQueue(); } private class PcapThread extends Thread { @@ -320,10 +290,13 @@ private volatile boolean working = false; - private PcapThread(String pcapFile, int loop) throws IOException { + private final SessionPacketHandler sessionPacketHandler; + + private PcapThread(String pcapFile, SessionPacketHandler sessionPacketHandler, int loop) throws IOException { super(PcapSessionEventSource.class.getSimpleName() + ".PcapThread"); reader = new PcapDataBlockReader(pcapFile); reader.open(); + this.sessionPacketHandler = sessionPacketHandler; this.loop = loop; } @@ -331,7 +304,7 @@ try { working = false; reader.close(); - dataBlockHandler.working = false; + sessionPacketHandler.deactivate(); } catch (Exception e) { if (LOGGER.isDebugEnabled()) { LOGGER.debug(e.getMessage(), e); @@ -339,22 +312,6 @@ } } - private void offer(Event event) { - boolean res = true; - do { - try { - res = queue.offer(event, 100, TimeUnit.MILLISECONDS); - } catch (Exception e) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(e.getMessage(), e); - } - } - } while (working && !res); - if (collectMetric) { - metric.incEnqueued(); - } - } - @Override public void run() { working = true; @@ -372,19 +329,24 @@ metric.incFrames(); } - dataBlockHandler.handle(dataBlock); + Ip ip = dataBlockHandler.handle(dataBlock); + if (ip != null) { + sessionPacketHandler.handle(ip); + } } if (working) { reader.reset(); } + sessionPacketHandler.onLoopEnd(); DataLoopEnd loopEvent = new DataLoopEnd(sourceName, loopNum); - offer(loopEvent); + handler.handle(loopEvent); if (loop != LOOP_INFINITE) { loop--; if (loop == 0) { - offer(new DataEnd(sourceName)); + sessionPacketHandler.onDataEnd(); + handler.handle(new DataEnd(sourceName)); break; } } @@ -395,7 +357,6 @@ sleepFor(loopDelay); } } - } catch (Exception e) { LOGGER.error(e.getMessage(), e); } @@ -407,8 +368,6 @@ private class DataBlockHandler { - private boolean working = true; - private long frameNum = 0; private long firstTimestamp = -1; @@ -425,28 +384,12 @@ this.loopDelay = loopDelay; } - private void offer(Event event) { - boolean res = true; - do { - try { - res = queue.offer(event, 100, TimeUnit.MILLISECONDS); - } catch (Exception e) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(e.getMessage(), e); - } - } - } while (working && !res); - if (collectMetric) { - metric.incEnqueued(); - } - } - private void nextLoop() { firstTimestamp = -1; loopCorrection = relativeTimestamp + loopDelay; } - public void handle(PcapDataBlock dataBlock) { + public Ip handle(PcapDataBlock dataBlock) { Frame frame = new MemoryFrame(dataBlock.getData(), dataBlock.getTimestamp()); frameDecoder.decode(frame); frame.setNumber(frameNum++); @@ -466,18 +409,17 @@ } frame.setTimestamp(relativeTimestamp); - offer(new IpEvent(ip)); + return ip; } } } } + + return null; } - } - private class SessionAnalyzerThread extends Thread { - - private volatile boolean working = false; + private class SessionPacketHandler { private long sessionFlushPeriod = 500; @@ -493,9 +435,7 @@ private final PcapSessionAnalyzerHookContext hookContext; - private SessionAnalyzerThread(boolean allowPartialSession) { - super(PcapSessionEventSource.class.getSimpleName() + ".SessionAnalyzerThread"); - + private SessionPacketHandler(boolean allowPartialSession) { tcpProc = new TcpSessionProcessor(); tcpProc.setTimeGenerator(timeGenerator); tcpProc.setAllowPartialSession(allowPartialSession); @@ -508,90 +448,53 @@ PcapSessionAnalyzerHook hook = getHook(analyzer); hook.attach(analyzer, hookContext); } + + tcpProc.start(); + udpProc.start(); + analyzers.forEach(Service::start); } public void deactivate() { - working = false; tcpProc.stop(); udpProc.stop(); analyzers.forEach(Service::stop); } - @Override - public void run() { - working = true; - tcpProc.start(); - udpProc.start(); - analyzers.forEach(Service::start); - - long lastTime = 0; - while (working) { - try { - Event event = queue.poll(100, TimeUnit.MILLISECONDS); - - if (event != null) { - if (collectMetric) { - metric.incDequeued(); - } - - LOGGER.trace("queue poll {}", event); - if (event.getType() == IpEvent.TYPE) { - IpEvent ipEvent = (IpEvent) event; - long time = ipEvent.ip.getTimestamp(); - timeGenerator.setTimeMillis(time); - - if (ipEvent.ip.getProtocol() == Ip.PROTO_TCP) { - tcpProc.handle(ipEvent.ip); - } else if (ipEvent.ip.getProtocol() == Ip.PROTO_UDP) { - udpProc.handle(ipEvent.ip); - } + public void onLoopEnd() { + tcpProc.closeAllSessions(); + eventHandler.flush(); + } - if (collectMetric) { - if (ipEvent.ip.getProtocol() == Ip.PROTO_TCP) { - metric.incTcpPacket(); - } - } + public void onDataEnd() { + eventHandler.flush(); + } - if (nextFlushTime == -1) { - nextFlushTime = time + sessionFlushPeriod; - } else if (nextFlushTime <= time) { - tcpProc.flush(); - nextFlushTime = time + sessionFlushPeriod; - } - lastTime = time; - } else if (event.getType() == DataLoopEnd.TYPE) { - event.setTimestamp(lastTime + 1); - eventHandler.handle(event); - tcpProc.closeAllSessions(); - eventHandler.flush(); - } else if (event.getType() == DataEnd.TYPE) { - event.setTimestamp(lastTime + 1); - eventHandler.handle(event); - eventHandler.flush(); - break; - } - } - } catch (Exception e) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(e.getMessage(), e); - } - } - + public void handle(Ip ip) { + if (collectMetric) { + metric.incDequeued(); } - working = false; - } - } - - private static class IpEvent extends AbstractEvent { + long time = ip.getTimestamp(); + timeGenerator.setTimeMillis(time); - private static final int TYPE = 1; + if (ip.getProtocol() == Ip.PROTO_TCP) { + tcpProc.handle(ip); + if (collectMetric) { + metric.incTcpPacket(); + } + } else if (ip.getProtocol() == Ip.PROTO_UDP) { + udpProc.handle(ip); + if (collectMetric) { + metric.incUdpPacket(); + } + } - private final Ip ip; - - public IpEvent(Ip ip) { - super(TYPE, null); - this.ip = ip; + if (nextFlushTime == -1) { + nextFlushTime = time + sessionFlushPeriod; + } else if (nextFlushTime <= time) { + tcpProc.flush(); + nextFlushTime = time + sessionFlushPeriod; + } } }
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSourceMetric.java Tue Jul 23 10:44:29 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSourceMetric.java Wed Jul 24 14:19:28 2019 +0200 @@ -22,6 +22,7 @@ private final MutableInt frames = new MutableInt(0); private final MutableInt tcpPackets = new MutableInt(0); + private final MutableInt udpPackets = new MutableInt(0); private final MutableInt payloads = new MutableInt(0); private final AtomicInteger events = new AtomicInteger(); private final MutableInt eventsEnqueued = new MutableInt(); @@ -79,6 +80,10 @@ tcpPackets.increment(); } + public void incUdpPacket() { + udpPackets.increment(); + } + public void incPayloads() { payloads.increment(); }
--- a/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java Tue Jul 23 10:44:29 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/source/PcapSessionEventSourceTest.java Wed Jul 24 14:19:28 2019 +0200 @@ -16,6 +16,7 @@ import org.testng.annotations.Test; import java.io.File; +import java.util.Iterator; import static org.testng.Assert.assertTrue; import static org.testng.AssertJUnit.assertNull; @@ -95,12 +96,14 @@ waitForSource(src); src.stop(); - SessionPayloadEvent payloadEvent = (SessionPayloadEvent) handler.findFirst(SessionPayloadEvent.TYPE); + Iterator<Event> it = handler.getEvents().iterator(); + SessionPayloadEvent payloadEvent = (SessionPayloadEvent) it.next(); assertTrue(payloadEvent.getRequest() instanceof Dns); assertTrue(payloadEvent.getResponse() instanceof Dns); - assertTrue(handler.get(handler.size() - 1) instanceof DataEnd); + assertTrue(it.next() instanceof DataLoopEnd); + assertTrue(it.next() instanceof DataEnd); } @Test(enabled = true)