Mercurial > stress-tester
changeset 1033:386815ce52ee
FlowWorkerBase refactorization in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/AsynchFlowWorker.java Thu Apr 02 15:34:59 2020 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,543 +0,0 @@ -package com.passus.st.client; - -import com.passus.commons.Assert; -import com.passus.commons.annotations.Plugin; -import com.passus.net.http.HttpRequest; -import com.passus.net.http.HttpResponse; -import com.passus.st.emitter.Emitter; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.plugin.PluginConstants; - -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - -@Plugin(name = AsynchFlowWorker.TYPE, category = PluginConstants.CATEGORY_FLOW_WORKER) -public class AsynchFlowWorker extends FlowWorkerBase { - - public static final String TYPE = "asynch"; - - private long waitTimeout = 100; - - private long windowPeriod = 10; - - private final Queue<TasksTimeWindow> windows = new ConcurrentLinkedQueue<>(); - - private TasksTimeWindow currentWindow; - - private volatile boolean processWindow = false; - - private volatile boolean flowStateChanged = false; - - private final Object readerLock = new Object(); - - private boolean responseSynch = false; - - public AsynchFlowWorker(Emitter emitter, String name, int index) { - super(emitter, name, index); - - } - - public long getWindowPeriod() { - return windowPeriod; - } - - public void setWindowPeriod(long windowPeriod) { - Assert.greaterThanZero(windowPeriod, "tickTime"); - this.windowPeriod = windowPeriod; - } - - public long getWaitTimeout() { - return waitTimeout; - } - - public void setWaitTimeout(long waitTimeout) { - Assert.greaterThanZero(waitTimeout, "eventsQueueWaitTime"); - this.waitTimeout = waitTimeout; - } - - public boolean isResponseSynch() { - return responseSynch; - } - - public void setResponseSynch(boolean responseSynch) { - this.responseSynch = responseSynch; - } - - private void waitQuietly() { - try { - lock.wait(waitTimeout); - } catch (InterruptedException ignore) { - } - } - - @Override - protected void closeAllConnections() { - synchronized (lock) { - boolean wait; - do { - wait = false; - for (FlowContext flowContext : sessions.values()) { - if (flowContext.isEventSent()) { - wait = true; - break; - } - } - - if (wait) { - try { - lock.wait(100); - } catch (Exception e) { - } - } - } while (wait); - - super.closeAllConnections(); - while (!sessions.isEmpty()) { - try { - lock.wait(100); - } catch (Exception e) { - } - } - } - } - - private void removeWindow(TasksTimeWindow window) { - if (window != null) { - windows.remove(window); - } - } - - private TasksTimeWindow createWindow(long time) { - TasksTimeWindow window = new TasksTimeWindow(time, windowPeriod); - windows.add(window); - return window; - } - - private TasksTimeWindow getWindow(long time, boolean create) { - if (windows.isEmpty()) { - if (create) { - return createWindow(time); - } - - return null; - } - - TasksTimeWindow firstWindow = windows.peek(); - if (firstWindow.inTimeRange(time)) { - return firstWindow; - } else if (windows.size() > 1) { - Iterator<TasksTimeWindow> it = windows.iterator(); - it.next(); - - while (it.hasNext()) { - TasksTimeWindow window = it.next(); - if (window.inTimeRange(time)) { - return window; - } - } - } - - if (create) { - return createWindow(time); - } - - return null; - } - - @Override - protected void flowStateChanged(FlowContext context, int oldState) { - synchronized (lock) { - flowStateChanged = true; - lock.notifyAll(); - } - } - - private void addEvent(Event event, TasksTimeWindow window) { - switch (event.getType()) { - case SessionPayloadEvent.TYPE: { - Event newEvent = eventInstanceForWorker(event); - long time = newEvent.getTimestamp(); - SessionPayloadEvent payloadEvent = (SessionPayloadEvent) newEvent; - SessionInfo session = payloadEvent.getSessionInfo(); - - SessionEventsTask task = window.getSessionEventsTask(session, true); - task.events.add(payloadEvent); - - if (responseSynch) { - HttpResponse resp = (HttpResponse) payloadEvent.getResponse(); - HttpRequest req = (HttpRequest) payloadEvent.getRequest(); - long respTime = time + (resp.getTimestamp() - req.getTimestamp()); - HttpResponseEvent respEvent = new HttpResponseEvent(session, event.getSourceName(), resp, respTime); - - task.events.add(respEvent); - } - - break; - } - case SessionStatusEvent.TYPE: { - Event newEvent = eventInstanceForWorker(event); - SessionStatusEvent statusEvent = (SessionStatusEvent) newEvent; - SessionEventsTask task = window.getSessionEventsTask(statusEvent.getSessionInfo(), true); - task.events.add(statusEvent); - break; - } - case DataEvents.DataLoopEnd.TYPE: - window.add(DataLoopEndTask.INSTANCE); - processWindow = true; - break; - case DataEvents.DataEnd.TYPE: - window.add(DataEndTask.INSTANCE); - processWindow = true; - break; - } - } - - @Override - public void handle(Event event) { - synchronized (readerLock) { - while (processWindow) { - try { - readerLock.wait(); - } catch (InterruptedException ignore) { - - } - } - } - - synchronized (lock) { - TasksTimeWindow window = getWindow(event.getTimestamp(), true); - if (currentWindow == null) { - currentWindow = window; - } else if (window != currentWindow) { - currentWindow = windows.peek(); - processWindow = true; - } - - addEvent(event, window); - if (processWindow) { - lock.notifyAll(); - } - } - - } - - private boolean processSessionEvent(SessionEvent event) { - switch (event.getType()) { - case SessionStatusEvent.TYPE: { - SessionStatusEvent statusEvent = (SessionStatusEvent) event; - if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { - connect(statusEvent); - } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { - FlowContext flowContext = flowContext(statusEvent); - if (flowContext != null) { - if (!flowContext.isEventSent()) { - disconnect(statusEvent); - } - } - } - - return true; - } - case SessionPayloadEvent.TYPE: { - SessionEvent sessEvent = event; - FlowContext flowContext = flowContext(sessEvent); - if (flowContext != null) { - switch (flowContext.state()) { - case FlowContext.STATE_CONNECTING: - return false; - case FlowContext.STATE_CONNECTED: - if (flowContext.isEventSent()) { - return false; - } else { - send(flowContext, (SessionPayloadEvent) event, true); - return true; - } - case FlowContext.STATE_DISCONNECTING: - case FlowContext.STATE_DISCONNECTED: - if (connectPartialSession) { - connect(sessEvent); - } else { - return true; - } - break; - } - } else if (connectPartialSession) { - connect(sessEvent); - } - - return true; - } - case HttpResponseEvent.TYPE: { - SessionEvent sessEvent = event; - FlowContext flowContext = flowContext(sessEvent); - if (flowContext != null) { - return (flowContext.state() >= FlowContext.STATE_DISCONNECTING); - } - - return true; - } - } - - return false; - } - - @Override - public void run() { - synchronized (lock) { - working = true; - while (working) { - try { - lock.wait(); - - boolean dataLoopEnd = false; - boolean dataEnd = false; - - for (; ; ) { - Iterator<Task> it = currentWindow.tasks.iterator(); - while (it.hasNext()) { - Task task = it.next(); - switch (task.type()) { - case SessionEventsTask.TYPE: - SessionEventsTask sessionTask = (SessionEventsTask) task; - if (isBlockedSession(sessionTask.session)) { - sessionTask.events.clear(); - it.remove(); - return; - } - - if (!sessionTask.events.isEmpty()) { - Event event = sessionTask.events.get(0); - if (processSessionEvent((SessionEvent) event)) { - sessionTask.events.remove(0); - } - } - - if (sessionTask.events.isEmpty()) { - it.remove(); - } - - break; - case DataLoopEndTask.TYPE: - dataLoopEnd = true; - it.remove(); - break; - case DataEndTask.TYPE: - dataEnd = true; - it.remove(); - break; - } - } - - if (currentWindow.tasks.isEmpty()) { - break; - } else if (!flowStateChanged) { - waitQuietly(); - } else { - flowStateChanged = false; - } - } - - if (dataLoopEnd) { - closeAllConnections(); - } - - if (dataEnd) { - working = false; - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } - } finally { - removeWindow(currentWindow); - processWindow = false; - synchronized (readerLock) { - readerLock.notifyAll(); - } - } - } - } - } - - private static final class HttpResponseEvent extends SessionEvent { - - public static final int TYPE = 1012; - - private final HttpResponse payload; - - public HttpResponseEvent(SessionInfo sessionInfo, String sourceName, HttpResponse payload, long timestamp) { - super(sessionInfo, sourceName); - this.payload = payload; - setTimestamp(timestamp); - } - - public HttpResponse getPayload() { - return payload; - } - - @Override - public SessionEvent instanceForWorker(int index) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public int getType() { - return TYPE; - } - - } - - private static interface Task { - - public int type(); - - } - - private static final class SessionEventsTask implements Task { - - public static final int TYPE = 1; - - private final SessionInfo session; - - private final List<SessionEvent> events = new LinkedList<>(); - - public SessionEventsTask(SessionInfo session) { - this.session = session; - } - - @Override - public int type() { - return TYPE; - } - - } - - private static final class DataLoopEndTask implements Task { - - public static final DataLoopEndTask INSTANCE = new DataLoopEndTask(); - - public static final int TYPE = 2; - - @Override - public int type() { - return TYPE; - } - - } - - private static final class DataEndTask implements Task { - - public static final DataEndTask INSTANCE = new DataEndTask(); - - public static final int TYPE = 3; - - @Override - public int type() { - return TYPE; - } - - } - - private static final class TasksTimeWindow { - - private final long startTime; - - private final long endTime; - - private final List<Task> tasks = new LinkedList<>(); - - private TasksTimeWindow(long time, long period) { - int factor = (int) (time / period); - startTime = factor * period; - endTime = (factor + 1) * period; - } - - public long startTime() { - return startTime; - } - - public long endTime() { - return endTime; - } - - public boolean inTimeRange(long time) { - return (startTime <= time && time < endTime); - } - - public void add(Task value) { - tasks.add(value); - } - - private SessionEventsTask createSessionEventsTask(SessionInfo session) { - SessionEventsTask task = new SessionEventsTask(session); - tasks.add(task); - return task; - } - - public SessionEventsTask getSessionEventsTask(SessionInfo session, boolean create) { - if (tasks.isEmpty()) { - if (create) { - return createSessionEventsTask(session); - } - - return null; - } - - for (Task task : tasks) { - if (task.type() == SessionEventsTask.TYPE) { - SessionEventsTask sessionTask = (SessionEventsTask) task; - if (sessionTask.session.equals(session)) { - return sessionTask; - } - } - } - - if (create) { - return createSessionEventsTask(session); - } - - return null; - } - - public List<Task> values() { - return tasks; - } - - public long getPeroid() { - return (endTime - startTime); - } - - @Override - public String toString() { - return "TimeWindow{" - + "startTimestamp=" + startTime - + ", endTimestamp=" + endTime + '}'; - } - - @Override - public int hashCode() { - int hash = 5; - hash = 17 * hash + (int) (this.startTime ^ (this.startTime >>> 32)); - hash = 17 * hash + (int) (this.endTime ^ (this.endTime >>> 32)); - return hash; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } else if (!(obj instanceof TasksTimeWindow)) { - return false; - } - - final TasksTimeWindow other = (TasksTimeWindow) obj; - return this.startTime == other.startTime - && this.endTime == other.endTime; - } - - } - -}
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Thu Apr 02 15:34:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Fri Apr 03 15:08:47 2020 +0200 @@ -36,16 +36,18 @@ protected SessionPayloadEvent sentEvent; - protected byte state = STATE_CONNECTING; + protected byte state = STATE_NEW; protected long connectionTime = -1; protected long timeout = -1; + protected long writeStartTime = -1; + + protected long writeEndTime = -1; + protected long receivedStartTime = -1; - protected long sendStartTime = -1; - protected int loop; protected FlowHandler client; @@ -92,6 +94,11 @@ lockCond.signal(); } + void signalAndUnlock() { + signal(); + unlock(); + } + public boolean isBidirectional() { return bidirectional; } @@ -163,11 +170,11 @@ } public long sendStartTimestamp() { - return sendStartTime; + return writeStartTime; } public void sendStartTimestamp(long sendStartTimestamp) { - this.sendStartTime = sendStartTimestamp; + this.writeStartTime = sendStartTimestamp; } public SessionPayloadEvent sentEvent() { @@ -269,6 +276,9 @@ timeout = -1; error = null; blocked = false; + writeStartTime = -1; + writeEndTime = -1; + receivedStartTime = -1; if (params != null) { params.clear();
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Thu Apr 02 15:34:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Fri Apr 03 15:08:47 2020 +0200 @@ -29,8 +29,6 @@ private final Timeouts timeouts = new Timeouts(); - protected final Object lock = new Object(); - protected volatile boolean working = false; private long checkTimeoutsPeriod = 5_000; @@ -61,11 +59,14 @@ @Override public int activeConnections() { int count = 0; - synchronized (lock) { - for (FlowContext flowContext : sessions.values()) { + for (FlowContext flowContext : sessions.values()) { + flowContext.lock(); + try { if (flowContext.state() != STATE_DISCONNECTED) { count++; } + } finally { + flowContext.unlock(); } } @@ -175,22 +176,21 @@ } protected FlowContext connect(SessionInfo session) { - synchronized (lock) { - try { - FlowContext flowContext = register(session); - if (flowContext != null) { - connect(flowContext, true); - } - } catch (Exception e) { - logger.error(e); - } - return null; + FlowContext flowContext = register(session); + if (flowContext != null) { + connect(flowContext, true); } + + return flowContext; } protected FlowContext registerAndConnect(SessionInfo session, boolean wait) { FlowContext flowContext = flowContext(session); if (flowContext != null) { + if (flowContext.blocked) { + return flowContext; + } + throw new RuntimeException("Not implemented yet."); } else { flowContext = register(session); @@ -208,6 +208,7 @@ flowContext.lock.lock(); try { flowContext.connectionAttempts++; + flowContext.state = STATE_CONNECTING; emitter.connect(flowContext.session, this, index); if (wait) { waitOpFinished(flowContext, STATE_CONNECTED); @@ -268,8 +269,12 @@ } long now = timeGenerator.currentTimeMillis(); - flowContext.lock.lock(); + flowContext.lock(); try { + if (trace) { + debug(flowContext, "Disconnecting."); + } + if (flowContext.state == STATE_DISCONNECTING || flowContext.state == STATE_DISCONNECTED) { return; @@ -302,7 +307,17 @@ } catch (InterruptedException e) { error(flowContext, e); } finally { - flowContext.lock.unlock(); + flowContext.signalAndUnlock(); + } + } + + protected void disconnectAllConnections() { + disconnectAllConnections(true); + } + + protected void disconnectAllConnections(boolean wait) { + for (FlowContext flowContext : sessions.values()) { + disconnect(flowContext, wait); } } @@ -363,39 +378,6 @@ } - protected void removeFlowContext(FlowContext flowContext) { - synchronized (lock) { - debug(flowContext, "removeFlowContext"); - sessions.remove(flowContext.sessionInfo()); - } - } - - /*protected void reconnect(FlowContext flowContext) { - synchronized (lock) { - try { - if (logger.isDebugEnabled()) { - debug(flowContext, "Reconnect (state: {}).", stateToString(flowContext.state())); - } - - SessionInfo session = flowContext.sessionInfo(); - updateFlowState(flowContext, FlowContext.STATE_CONNECTING); - emitter.connect(session, this, index); - } catch (Exception e) { - error(flowContext, e.getMessage(), e); - } - } - }*/ - - protected void closeAllConnections() { - closeAllConnections(true); - } - - protected void closeAllConnections(boolean wait) { - for (FlowContext flowContext : sessions.values()) { - disconnect(flowContext, wait); - } - } - private void sleepSilently(long millis) { if (millis == 0) { return; @@ -440,7 +422,7 @@ try { context.setBidirectional(flowContext.isBidirectional()); flowContext.channelContext(context); - context.setAttachment(flowContext); + context.setFlowContext(flowContext); flowContext.connectionAttempts = 0; flowContext.buffer(new HeapByteBuff(FlowContext.INIT_BUFFER_CAPACITY)); flowContext.state = STATE_CONNECTED; @@ -463,7 +445,7 @@ @Override public void channelInactive(ChannelContext context) throws Exception { - FlowContext flowContext = (FlowContext) context.getAttachment(); + FlowContext flowContext = (FlowContext) context.getFlowContext(); if (logger.isDebugEnabled()) { debug(flowContext, "Channel inactive."); } @@ -489,24 +471,25 @@ @Override public void sessionInvalidated(SessionInfo session) throws Exception { - synchronized (lock) { - if (logger.isDebugEnabled()) { - logger.debug("Session {} invalidated.", session); - } + if (logger.isDebugEnabled()) { + logger.debug("Session {} invalidated.", session); + } - FlowContext flowContext = flowContext(session); - if (flowContext != null) { + FlowContext flowContext = flowContext(session); + if (flowContext != null) { + flowContext.lock(); + try { disconnect(flowContext); + addBlockedSession(session); + } finally { + flowContext.signalAndUnlock(); } - - addBlockedSession(session); - lock.notifyAll(); } } @Override public void dataReceived(ChannelContext context, ByteBuff data) throws Exception { - FlowContext flowContext = (FlowContext) context.getAttachment(); + FlowContext flowContext = (FlowContext) context.getFlowContext(); logger.debug("dataReceived"); flowContext.lock(); try { @@ -573,42 +556,43 @@ error(flowContext, FlowError.unknownError()); } } finally { - flowContext.signal(); - flowContext.unlock(); + flowContext.signalAndUnlock(); } } @Override public void dataWriteStart(ChannelContext context) { - synchronized (lock) { - FlowContext flowContext = (FlowContext) context.getAttachment(); + FlowContext flowContext = context.getFlowContext(); + flowContext.lock(); + try { if (flowContext.sentEvent() != null) { - long now = timeGenerator.currentTimeMillis(); - flowContext.sendStartTimestamp(now); - flowContext.client().onDataWriteStart(flowContext); + flowContext.writeStartTime = timeGenerator.currentTimeMillis(); + flowContext.writeEndTime = -1; + flowContext.client.onDataWriteStart(flowContext); } + } finally { + flowContext.signalAndUnlock(); } } @Override public void dataWritten(ChannelContext context) throws Exception { - synchronized (lock) { - FlowContext flowContext = (FlowContext) context.getAttachment(); + FlowContext flowContext = context.getFlowContext(); + flowContext.lock(); + try { if (flowContext.isEventSent()) { + long now = timeGenerator.currentTimeMillis(); if (collectMetric) { - long now = timeGenerator.currentTimeMillis(); synchronized (metric) { metric.addRequestSendingTime(now - flowContext.sendStartTimestamp()); } } - flowContext.client().onDataWriteEnd(flowContext); - if (!flowContext.isBidirectional()) { - responseReceived0(flowContext, null); - } + flowContext.writeEndTime = now; + flowContext.client.onDataWriteEnd(flowContext); } - - lock.notifyAll(); + } finally { + flowContext.signalAndUnlock(); } } @@ -618,7 +602,7 @@ logger.debug("Error occurred. ", cause); } - FlowContext flowContext = (FlowContext) context.getAttachment(); + FlowContext flowContext = (FlowContext) context.getFlowContext(); //Jezeli nie nastapilo polaczenie flowContext == null if (flowContext == null) { flowContext = flowContext(context); @@ -627,9 +611,13 @@ flowContext.lock.lock(); try { if (flowContext.state == STATE_CONNECTING) { - if (flowContext.connectionAttempts < maxConnectionAttempts) { - //TODO - malo optymalne, blokuje przetwarzanie eventow dla konkretnej sesji. - // Odbije sie nw wydajnosci workera asynch. + if (flowContext.connectionAttempts <= maxConnectionAttempts) { + if (logger.isDebugEnabled()) { + logger.debug("Connection failed. Reconnection (attempt {}/{}, delay {}ms).", + flowContext.connectionAttempts, maxConnectionAttempts, reconnectDelay, + cause); + } + if (reconnectDelay > 0) { try { Thread.sleep(reconnectDelay); @@ -640,6 +628,10 @@ connect(flowContext); return; + } else { + if (logger.isDebugEnabled()) { + logger.debug("Connection failed. No reconnection.", flowContext.connectionAttempts, maxConnectionAttempts, cause); + } } } } finally { @@ -650,10 +642,8 @@ } protected void send(FlowContext flowContext, SessionPayloadEvent event, boolean wait) { - logger.debug("send"); flowContext.lock(); try { - logger.debug("send-after lock"); Object req = event.getRequest(); if (req != null) { if (filterChain.filterOutbound(req, event.getResponse(), flowContext) == Filter.DENY) { @@ -688,12 +678,18 @@ try { flowContext.sentEvent = event; + flowContext.writeStartTime = -1; + flowContext.writeEndTime = -1; flowContext.channelContext().writeAndFlush(buffer); requestSent0(flowContext, event); buffer.clear(); - if (wait && flowContext.isBidirectional()) { - waitForResponse(flowContext); + if (wait) { + if (flowContext.isBidirectional()) { + waitForResponse(flowContext); + } else { + waitForWriteEnd(flowContext); + } } } catch (Exception e) { flowContext.sendErrors++; @@ -713,6 +709,23 @@ } } + protected boolean waitForWriteEnd(FlowContext flowContext) throws InterruptedException { + return waitForWriteEnd(flowContext, timeouts.getDefaultTimeout()); + } + + protected boolean waitForWriteEnd(FlowContext flowContext, long timeout) throws InterruptedException { + long timeNanos = TimeUnit.MILLISECONDS.toNanos(timeout); + while (flowContext.writeEndTime == -1 && !flowContext.isError()) { + if (timeNanos <= 0) { + return false; + } + + timeNanos = flowContext.lockCond.awaitNanos(timeNanos); + } + + return true; + } + protected boolean waitForResponse(FlowContext flowContext) throws InterruptedException { return waitForResponse(flowContext, timeouts.getDefaultTimeout()); } @@ -783,34 +796,32 @@ } protected void processTimeouts() { - synchronized (lock) { - try { - long now = timeGenerator.currentTimeMillis(); - if (nextCheckTimeoutsTime == -1) { - nextCheckTimeoutsTime = now + checkTimeoutsPeriod; - } else if (nextCheckTimeoutsTime > now) { - nextCheckTimeoutsTime = now + checkTimeoutsPeriod; - for (FlowContext flowContext : sessions.values()) { - if (flowContext.timeouted(now)) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Flow for session '{}' timed out (state '{}').", - flowContext.sessionInfo(), - stateToString(flowContext.state())); - } + try { + long now = timeGenerator.currentTimeMillis(); + if (nextCheckTimeoutsTime == -1) { + nextCheckTimeoutsTime = now + checkTimeoutsPeriod; + } else if (nextCheckTimeoutsTime > now) { + nextCheckTimeoutsTime = now + checkTimeoutsPeriod; + for (FlowContext flowContext : sessions.values()) { + if (flowContext.timeouted(now)) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Flow for session '{}' timed out (state '{}').", + flowContext.sessionInfo(), + stateToString(flowContext.state())); + } - switch (flowContext.state()) { - case FlowContext.STATE_CONNECTING: - case FlowContext.STATE_CONNECTED: - disconnect(flowContext); - break; - } + switch (flowContext.state()) { + case FlowContext.STATE_CONNECTING: + case FlowContext.STATE_CONNECTED: + disconnect(flowContext); + break; } } } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); } } }
--- a/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Thu Apr 02 15:34:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/ParallelFlowWorker.java Fri Apr 03 15:08:47 2020 +0200 @@ -7,7 +7,10 @@ import com.passus.st.emitter.SessionInfo; import com.passus.st.plugin.PluginConstants; -import java.util.*; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.LinkedList; +import java.util.Queue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Semaphore; @@ -67,7 +70,6 @@ this.eventsQueueWaitTime = eventsQueueWaitTime; } - @Override protected void removeFlowContext(FlowContext flowContext) { if (flowContext != null) { flowIndex.remove(flowContext); @@ -82,7 +84,7 @@ } private void waitCloseAllConnections() { - closeAllConnections = true; + /*closeAllConnections = true; synchronized (lock) { while (!flowIndex.isEmpty()) { try { @@ -92,7 +94,8 @@ } } - closeAllConnections = false; + closeAllConnections = false;*/ + throw new RuntimeException("Not implemented."); } /* @Override @@ -168,10 +171,8 @@ } private void makeFirst(LocalFlowContext flowContext) { - synchronized (lock) { - flowIndex.remove(flowContext); - flowIndex.addFirst(flowContext); - } + flowIndex.remove(flowContext); + flowIndex.addFirst(flowContext); } private void addToQueue(LocalFlowContext flowContext, Event event) { @@ -194,13 +195,10 @@ } if (newEvent != null) { - synchronized (lock) { - try { - eventsQueue.put(newEvent); - lock.notifyAll(); - } catch (Exception e) { - logger.debug("Unable to add event to queue. " + e.getMessage(), e); - } + try { + eventsQueue.put(newEvent); + } catch (Exception e) { + logger.debug("Unable to add event to queue. " + e.getMessage(), e); } } } @@ -271,23 +269,16 @@ @Override public void run() { - synchronized (lock) { - working = true; - while (working) { - try { - try { - lock.wait(eventsQueueWaitTime); - } catch (InterruptedException ignore) { - } - - Event event; - while ((event = eventsQueue.poll()) != null) { - processEvent(event); - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } + working = true; + while (working) { + try { + Event event; + while ((event = eventsQueue.poll()) != null) { + processEvent(event); + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage(), e); } } }
--- a/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Thu Apr 02 15:34:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SynchFlowWorker.java Fri Apr 03 15:08:47 2020 +0200 @@ -75,7 +75,6 @@ return; } else if (event.getType() == SessionPayloadEvent.TYPE) { FlowContext flowContext = flowContext(sessEvent); - if (flowContext != null) { if (flowContext.blocked) { return; @@ -99,7 +98,7 @@ logger.debug("DataLoopEnd received."); } - closeAllConnections(); + disconnectAllConnections(); filterChain.reset(); } else if (event.getType() == DataEvents.DataEnd.TYPE) { if (logger.isDebugEnabled()) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java Thu Apr 02 15:34:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java Fri Apr 03 15:08:47 2020 +0200 @@ -2,13 +2,14 @@ import com.passus.data.ByteBuff; import com.passus.net.SocketAddress; +import com.passus.st.client.FlowContext; import java.io.IOException; /** * @author Mirosław Hawrot */ -public interface ChannelContext<T> { +public interface ChannelContext { boolean isBidirectional(); @@ -42,8 +43,8 @@ SessionInfo getSessionInfo(); - void setAttachment(T attachment); + void setFlowContext(FlowContext attachment); - T getAttachment(); + FlowContext getFlowContext(); }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java Thu Apr 02 15:34:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java Fri Apr 03 15:08:47 2020 +0200 @@ -2,6 +2,7 @@ import com.passus.data.ByteBuff; import com.passus.net.SocketAddress; +import com.passus.st.client.FlowContext; import com.passus.st.emitter.ChannelContext; import com.passus.st.emitter.SessionInfo; @@ -11,7 +12,7 @@ import java.util.LinkedList; import java.util.Queue; -public abstract class NioChannelContext<T, K> implements ChannelContext<K> { +public abstract class NioChannelContext<T> implements ChannelContext { protected final NioEmitterWorker worker; @@ -29,7 +30,7 @@ protected SelectionKey key; - private K attachment; + private FlowContext flowContext; public NioChannelContext(NioEmitterWorker worker, T channel, SocketAddress remoteAddress, SessionInfo sessionInfo) { this.worker = worker; @@ -94,12 +95,12 @@ } @Override - public K getAttachment() { - return attachment; + public FlowContext getFlowContext() { + return flowContext; } @Override - public void setAttachment(K attachment) { - this.attachment = attachment; + public void setFlowContext(FlowContext attachment) { + this.flowContext = attachment; } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java Thu Apr 02 15:34:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java Fri Apr 03 15:08:47 2020 +0200 @@ -3,6 +3,7 @@ import com.passus.data.ByteBuff; import com.passus.net.SocketAddress; import com.passus.net.utils.AddressUtils; +import com.passus.st.client.FlowContext; import com.passus.st.emitter.ChannelContext; import com.passus.st.emitter.SessionInfo; @@ -16,7 +17,7 @@ /** * @author Mirosław Hawrot */ -public class NioChannelContext2<K> implements ChannelContext<K> { +public class NioChannelContext2 implements ChannelContext { private final NioEmitterWorker2 worker; @@ -34,7 +35,7 @@ private SelectionKey key; - private K attachment; + private FlowContext flowContext; /** * Usunac @@ -125,12 +126,12 @@ } @Override - public K getAttachment() { - return attachment; + public FlowContext getFlowContext() { + return flowContext; } @Override - public void setAttachment(K attachment) { - this.attachment = attachment; + public void setFlowContext(FlowContext flowContext) { + this.flowContext = flowContext; } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramChannelContext.java Thu Apr 02 15:34:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramChannelContext.java Fri Apr 03 15:08:47 2020 +0200 @@ -8,7 +8,7 @@ import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; -public class NioDatagramChannelContext<T> extends NioChannelContext<DatagramChannel, T> { +public class NioDatagramChannelContext extends NioChannelContext<DatagramChannel> { public NioDatagramChannelContext(NioEmitterWorker worker, DatagramChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) { super(worker, channel, remoteAddress, sessionInfo);
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioSocketChannelContext.java Thu Apr 02 15:34:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioSocketChannelContext.java Fri Apr 03 15:08:47 2020 +0200 @@ -11,7 +11,7 @@ /** * @author Mirosław Hawrot */ -public class NioSocketChannelContext<T> extends NioChannelContext<SocketChannel, T> { +public class NioSocketChannelContext extends NioChannelContext<SocketChannel> { public NioSocketChannelContext(NioEmitterWorker worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) { super(worker, channel, remoteAddress, sessionInfo);
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java Thu Apr 02 15:34:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapChannelContext.java Fri Apr 03 15:08:47 2020 +0200 @@ -3,7 +3,7 @@ import com.passus.data.ByteBuff; import com.passus.net.MACAddress; import com.passus.net.SocketAddress; -import com.passus.pcap.Pcap; +import com.passus.st.client.FlowContext; import com.passus.st.emitter.ChannelContext; import com.passus.st.emitter.EmitterHandler; import com.passus.st.emitter.SessionInfo; @@ -12,7 +12,7 @@ import java.util.LinkedList; import java.util.Queue; -public class UnidirectionalPcapChannelContext<K> implements ChannelContext<K> { +public class UnidirectionalPcapChannelContext implements ChannelContext { private static final int DEFAULT_BUFFER_SIZE = 65 * 1024; @@ -24,7 +24,7 @@ final byte[] buffer; - private K attachment; + private FlowContext flowContext; private final EmitterHandler handler; @@ -99,13 +99,13 @@ } @Override - public void setAttachment(K attachment) { - this.attachment = attachment; + public void setFlowContext(FlowContext attachment) { + this.flowContext = attachment; } @Override - public K getAttachment() { - return attachment; + public FlowContext getFlowContext() { + return flowContext; } @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java Thu Apr 02 15:34:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java Fri Apr 03 15:08:47 2020 +0200 @@ -2,6 +2,7 @@ import com.passus.data.ByteBuff; import com.passus.net.SocketAddress; +import com.passus.st.client.FlowContext; import com.passus.st.emitter.ChannelContext; import com.passus.st.emitter.SessionInfo; @@ -12,7 +13,7 @@ import static com.passus.st.emitter.StatelessTasks.CLOSE_TASK; import static com.passus.st.emitter.StatelessTasks.FLUSH_TASK; -public abstract class AbstractChannelContext<K> implements ChannelContext<K> { +public abstract class AbstractChannelContext implements ChannelContext { protected final Connection connection; @@ -20,7 +21,7 @@ private final Queue<byte[]> dataQueue; - private K attachment; + private FlowContext flowContext; public AbstractChannelContext(Connection connection) { this.connection = connection; @@ -77,11 +78,11 @@ } @Override - public K getAttachment() { - return attachment; + public FlowContext getFlowContext() { + return flowContext; } - public void setAttachment(K attachment) { - this.attachment = attachment; + public void setFlowContext(FlowContext attachment) { + this.flowContext = attachment; } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramSocketChannelContext.java Thu Apr 02 15:34:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramSocketChannelContext.java Fri Apr 03 15:08:47 2020 +0200 @@ -1,6 +1,6 @@ package com.passus.st.emitter.socket; -public class DatagramSocketChannelContext<K> extends AbstractChannelContext<K> { +public class DatagramSocketChannelContext extends AbstractChannelContext { public DatagramSocketChannelContext(Connection connection) { super(connection);
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketChannelContext.java Thu Apr 02 15:34:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketChannelContext.java Fri Apr 03 15:08:47 2020 +0200 @@ -1,6 +1,6 @@ package com.passus.st.emitter.socket; -public class SocketChannelContext<K> extends AbstractChannelContext<K> { +public class SocketChannelContext extends AbstractChannelContext { public SocketChannelContext(Connection connection) { super(connection);
--- a/stress-tester/src/test/java/com/passus/st/client/AsynchFlowWorkerTest.java Thu Apr 02 15:34:59 2020 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,78 +0,0 @@ -package com.passus.st.client; - -import com.passus.commons.service.ServiceUtils; -import com.passus.st.AbstractWireMockTest; -import com.passus.st.emitter.RuleBasedSessionMapper; -import com.passus.st.emitter.nio.NioEmitter; -import com.passus.st.utils.EventUtils; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import static com.github.tomakehurst.wiremock.client.WireMock.*; -import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertTrue; - -public class AsynchFlowWorkerTest extends AbstractWireMockTest { - - private NioEmitter prepareEmitter(String mapperRule) throws Exception { - RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper(); - sessionMapper.addRule(mapperRule); - - NioEmitter emitter = new NioEmitter(); - emitter.setSessionMapper(sessionMapper); - return emitter; - } - - @BeforeMethod - public void beforeMethod() { - String content = "test"; - stubFor(post(urlEqualTo("/bskonl/transfers/anytransfer/newtransfer.html")) - .willReturn(aResponse() - .withHeader("Content-Type", "text/plain") - .withHeader("Content-Length", "" + content.length()) - .withBody(content))); - } - - @Test(enabled = false) - public void testHandle() throws Exception { - Map<String, Object> props = new HashMap<>(); - props.put("allowPartialSession", true); - props.put("ports", 4214); - List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); - assertEquals(4, events.size()); - - NioEmitter emitter = prepareEmitter("10.87.110.40:4214 -> " + HOST + ":" + port()); - emitter.start(); - - TestHttpClientListener listener = new TestHttpClientListener(); - - AsynchFlowWorker worker = new AsynchFlowWorker(emitter, "test", 0); - try { - worker.setListener(listener); - worker.start(); - - SessionEvent sessionEvent = (SessionEvent) events.get(0); - SessionStatusEvent statusEvent = new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED); - statusEvent.setTimestamp(sessionEvent.getTimestamp()); - worker.handle(statusEvent); - - events.forEach(worker::handle); - - worker.join(); - assertTrue(listener.size() > 0); - assertTrue(listener.get(0) instanceof TestHttpClientListener.ResponseReceivedEvent); - TestHttpClientListener.ResponseReceivedEvent event = (TestHttpClientListener.ResponseReceivedEvent) listener.get(0); - String responseStr = event.getResponse().toString(); - assertTrue(responseStr.startsWith("HTTP/1.1 200 OK")); - assertTrue(responseStr.endsWith("test")); - } finally { - ServiceUtils.stopQuietly(emitter); - } - } - -} \ No newline at end of file
--- a/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java Thu Apr 02 15:34:59 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/FlowWorkerBaseTest.java Fri Apr 03 15:08:47 2020 +0200 @@ -128,7 +128,7 @@ TestFlowWorker worker = worker(); ChannelContext channelContext = makeConnected(worker, session); FlowContext flowContext = worker.flowContext(session); - when(channelContext.getAttachment()).thenReturn(flowContext); + when(channelContext.getFlowContext()).thenReturn(flowContext); doAnswer((Answer<Void>) invocation -> { worker.channelInactive(channelContext);
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Thu Apr 02 15:34:59 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Fri Apr 03 15:08:47 2020 +0200 @@ -9,7 +9,6 @@ import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpResponse; import com.passus.net.http.HttpResponseEncoder; -import com.passus.st.Log4jConfigurationFactory; import com.passus.st.emitter.*; import com.passus.st.metric.MetricsContainer; import com.passus.st.utils.EventUtils; @@ -19,7 +18,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -154,7 +152,7 @@ protected boolean bidirectional = true; - private Object attachment; + private FlowContext flowContext; public LocalChannelContext(LocalEmitter emitter, EmitterHandler handler, SocketAddress remoteAddress, SessionInfo sessionInfo) { this.emitter = emitter; @@ -224,13 +222,13 @@ } @Override - public Object getAttachment() { - return attachment; + public FlowContext getFlowContext() { + return flowContext; } @Override - public void setAttachment(Object attachment) { - this.attachment = attachment; + public void setFlowContext(FlowContext flowContext) { + this.flowContext = flowContext; } }
--- a/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java Thu Apr 02 15:34:59 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java Fri Apr 03 15:08:47 2020 +0200 @@ -4,15 +4,14 @@ import com.passus.commons.utils.ArrayUtils; import com.passus.net.netflow.Netflow9; import com.passus.net.netflow.Netflow9Decoder; -import com.passus.st.Log4jConfigurationFactory; import com.passus.st.Protocols; import com.passus.st.client.Event; import com.passus.st.client.FlowExecutor; import com.passus.st.emitter.RuleBasedSessionMapper; import com.passus.st.emitter.nio.NioEmitter; import com.passus.st.utils.EventUtils; +import com.passus.st.utils.server.DefaultServerListener; import com.passus.st.utils.server.SimpleDatagramServer; -import com.passus.st.utils.server.DefaultServerListener; import org.testng.annotations.Test; import java.util.HashMap; @@ -69,4 +68,14 @@ } + @Test(enabled = false) + public void testHandle_Netflow2() throws Exception { + for (int i = 0; i < 1000; i++) { + if (i % 10 == 0) { + System.out.println(i); + } + + testHandle_Netflow(); + } + } }