Mercurial > stress-tester
changeset 521:dd71d49065ad
HttpSychClientWorker bugfixes
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/Log4jConfigurationFactory.java Wed Aug 23 10:11:02 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/Log4jConfigurationFactory.java Fri Aug 25 08:46:27 2017 +0200 @@ -59,6 +59,7 @@ public static void enableFactory(Level level) { System.setProperty("log4j.configurationFactory", "com.passus.st.Log4jConfigurationFactory"); + System.setProperty("Log4jContextSelector", "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector"); Log4jConfigurationFactory.level = level; }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java Wed Aug 23 10:11:02 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java Fri Aug 25 08:46:27 2017 +0200 @@ -101,57 +101,64 @@ this.sleepFactor = sleepFactor; } - protected final void changeFlowState(HttpFlowContext context, int state) { - int oldState = context.state; + protected final void changeFlowState(HttpFlowContext flowContext, int state) { + if (flowContext.state == state) { + return; + } + + int oldState = flowContext.state; if (logger.isDebugEnabled()) { - debug(context, "Flow status changing {} -> {}.", - context.stateString(), HttpFlowContext.contextStateToString(state) + debug(flowContext, "Flow status changing {} -> {}.", + flowContext.stateString(), HttpFlowContext.contextStateToString(state) ); } switch (state) { case HttpFlowContext.STATE_CONNECTING: - context.clear(); + flowContext.clear(); break; case HttpFlowContext.STATE_CONNECTED: - context.decoder = new HttpFullMessageDecoder(); - context.decoder.setDecodeRequest(false); - context.buffer = new HeapByteBuff(HttpFlowContext.INIT_BUFFER_CAPACITY); + flowContext.decoder = new HttpFullMessageDecoder(); + flowContext.decoder.setDecodeRequest(false); + flowContext.buffer = new HeapByteBuff(HttpFlowContext.INIT_BUFFER_CAPACITY); break; case HttpFlowContext.STATE_ERROR: - context.sentEvent = null; + flowContext.sentEvent = null; break; case HttpFlowContext.STATE_RESP_RECEIVED: - context.sentEvent = null; - context.receivedStartTimestamp = -1; + flowContext.sentEvent = null; + flowContext.receivedStartTimestamp = -1; break; case HttpFlowContext.STATE_DISCONNECTING: - if (context.state < HttpFlowContext.STATE_DISCONNECTING) { - context.clear(); - if (context.channelContext != null) { + if (flowContext.state < HttpFlowContext.STATE_DISCONNECTING) { + if (flowContext.channelContext != null) { try { - context.channelContext.close(); + flowContext.channelContext.close(); } catch (Exception e) { if (logger.isDebugEnabled()) { logger.debug(e.getMessage(), e); } } + } else { + changeFlowState(flowContext, HttpFlowContext.STATE_DISCONNECTED); } } else { return; } break; case HttpFlowContext.STATE_DISCONNECTED: - context.state = HttpFlowContext.STATE_DISCONNECTED; - context.timeout = -1; - context.clear(); + flowContext.state = HttpFlowContext.STATE_DISCONNECTED; + flowContext.timeout = -1; + flowContext.clear(); + removeFlowContext(flowContext); + flowStateChanged(flowContext, oldState); return; } - long timeout = timeouts.get(context.state); - context.timeout = System.currentTimeMillis() + timeout; - context.state = state; - flowStateChanged(context, oldState); + long timeout = timeouts.get(flowContext.state); + flowContext.timeout = System.currentTimeMillis() + timeout; + flowContext.state = state; + flowStateChanged(flowContext, oldState); } protected void flowStateChanged(HttpFlowContext context, int oldState) { @@ -186,14 +193,16 @@ } protected HttpFlowContext register(SessionInfo session) { - if (sessions.containsKey(session)) { - logger.warn("Unable to register session '" + session + "'. Session already registered."); - return null; + synchronized (lock) { + if (sessions.containsKey(session)) { + logger.warn("Unable to register session '" + session + "'. Session already registered."); + return null; + } + + HttpFlowContext flowContext = createFlowContext(session); + sessions.put(session, flowContext); + return flowContext; } - - HttpFlowContext flowContext = createFlowContext(session); - sessions.put(session, flowContext); - return flowContext; } protected HttpFlowContext connect(SessionEvent sessionEvent) { @@ -201,14 +210,18 @@ } protected HttpFlowContext connect(SessionInfo session) { - try { - HttpFlowContext flowContext = register(session); - emitter.connect(session, this, index); - return flowContext; - } catch (Exception e) { - logger.error(e.getMessage(), e); + synchronized (lock) { + try { + HttpFlowContext flowContext = register(session); + if (flowContext != null) { + emitter.connect(session, this, index); + return flowContext; + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + return null; } - return null; } @Override @@ -217,7 +230,7 @@ for (Map.Entry<SessionInfo, HttpFlowContext> entry : sessions.entrySet()) { HttpFlowContext flowContext = entry.getValue(); try { - closeSession(flowContext, false); + closeSession(flowContext); } catch (Exception e) { if (logger.isDebugEnabled()) { debug(flowContext, e.getMessage(), e); @@ -237,7 +250,7 @@ protected void close(HttpFlowContext flowContext) { synchronized (lock) { try { - closeSession(flowContext, true); + closeSession(flowContext); } catch (Exception e) { if (logger.isDebugEnabled()) { logger.debug(e.getMessage(), e); @@ -251,7 +264,7 @@ synchronized (lock) { try { HttpFlowContext flowContext = flowContext(session); - closeSession(flowContext, true); + closeSession(flowContext); } catch (Exception e) { if (logger.isDebugEnabled()) { logger.debug(e.getMessage(), e); @@ -260,38 +273,41 @@ } } - protected void closeSession(HttpFlowContext flowContext, boolean remove) { - if (flowContext != null) { - changeFlowState(flowContext, HttpFlowContext.STATE_DISCONNECTING); - if (remove) { - removeFlowContext(flowContext); + protected void closeSession(HttpFlowContext flowContext) { + synchronized (lock) { + if (flowContext != null) { + changeFlowState(flowContext, HttpFlowContext.STATE_DISCONNECTING); } } } protected void removeFlowContext(HttpFlowContext flowContext) { - sessions.remove(flowContext.sessionInfo()); + synchronized (lock) { + debug(flowContext, "removeFlowContext"); + sessions.remove(flowContext.sessionInfo()); + } } protected void reconnect(HttpFlowContext flowContext) { - try { - if (logger.isDebugEnabled()) { - debug(flowContext, "Reconnect (state: {}).", flowContext.stateString()); - } + synchronized (lock) { + try { + if (logger.isDebugEnabled()) { + debug(flowContext, "Reconnect (state: {}).", flowContext.stateString()); + } - SessionInfo session = flowContext.sessionInfo(); - changeFlowState(flowContext, HttpFlowContext.STATE_CONNECTING); - emitter.connect(session, this, index); - } catch (Exception e) { - error(flowContext, e.getMessage(), e); + SessionInfo session = flowContext.sessionInfo(); + changeFlowState(flowContext, HttpFlowContext.STATE_CONNECTING); + emitter.connect(session, this, index); + } catch (Exception e) { + error(flowContext, e.getMessage(), e); + } } - } protected void closeAllConnections() { synchronized (lock) { for (HttpFlowContext flowContext : sessions.values()) { - closeSession(flowContext, true); + closeSession(flowContext); } } } @@ -343,7 +359,6 @@ } lock.notifyAll(); - } } @@ -519,25 +534,26 @@ long now = System.currentTimeMillis(); req.setTag(TAG_TIME_START, now); context.sendStartTimestamp = now; + changeFlowState(context, HttpFlowContext.STATE_REQ_SENT); + context.sentEvent = event; + context.channelContext.writeAndFlush(context.buffer); if (logger.isDebugEnabled()) { - debug(context, "Request '{}' sent ({} bytes).", req.getUrl(), context.buffer.length()); + debug(context, "Request '{}' sending ({} bytes).", req.getUrl(), context.buffer.length()); } + context.buffer.clear(); - - context.sentEvent = event; - changeFlowState(context, HttpFlowContext.STATE_REQ_SENT); + + return true; } catch (Exception e) { if (logger.isDebugEnabled()) { debug(context, e.getMessage(), e); } - - return false; } } } - return true; + return false; } protected void processTimeouts() { @@ -560,7 +576,7 @@ case HttpFlowContext.STATE_CONNECTED: case HttpFlowContext.STATE_REQ_SENT: case HttpFlowContext.STATE_ERROR: - closeSession(flowContext, true); + closeSession(flowContext); break; case HttpFlowContext.STATE_RESP_RECEIVED: //Dziwny blad nie powinien wystepowac @@ -617,15 +633,19 @@ message = String.format(message, args); } + SessionInfo session = flowContext.sessionInfo(); + int localPort = session.getSrcPort(); if (args.length == 0) { - logger.log(level, message + " [{}]", flowContext.sessionInfo()); + logger.log(level, message + " [{},{}]", session, localPort); } else { - Object[] logArgs = new Object[args.length + 1]; + Object[] logArgs = new Object[args.length + 2]; for (int i = 0; i < args.length; i++) { logArgs[i] = args[i]; } - logArgs[args.length] = flowContext.sessionInfo(); - logger.log(level, message + " [{}]", logArgs); + + logArgs[logArgs.length - 2] = session; + logArgs[logArgs.length - 1] = localPort; + logger.log(level, message + " [{},{}]", logArgs); } } }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java Wed Aug 23 10:11:02 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java Fri Aug 25 08:46:27 2017 +0200 @@ -1,12 +1,12 @@ package com.passus.st.client.http; +import com.passus.commons.Assert; import com.passus.commons.annotations.Plugin; import com.passus.st.client.DataEvents.DataEnd; import com.passus.st.client.DataEvents.DataLoopEnd; import com.passus.st.client.Event; import com.passus.st.client.SessionEvent; import com.passus.st.client.SessionStatusEvent; -import com.passus.st.emitter.ChannelContext; import com.passus.st.emitter.Emitter; import com.passus.st.emitter.SessionInfo; import com.passus.st.plugin.PluginConstants; @@ -25,6 +25,9 @@ private long eventsQueueWaitTime = 100; + /** + * Context dla ktorego wykonywana jest operacja. + */ private HttpFlowContext currFlowContext; private boolean loopEnd = false; @@ -38,12 +41,23 @@ return working; } + public long getEventsQueueWaitTime() { + return eventsQueueWaitTime; + } + + public void setEventsQueueWaitTime(long eventsQueueWaitTime) { + Assert.greaterThanZero(eventsQueueWaitTime, "eventsQueueWaitTime"); + this.eventsQueueWaitTime = eventsQueueWaitTime; + } + @Override - protected void closeSession(HttpFlowContext flowContext, boolean remove) { - if (flowContext != null) { - super.closeSession(flowContext, remove); - - if (currFlowContext == flowContext) { + protected void flowStateChanged(HttpFlowContext context, int oldState) { + logger.debug("flowStateChanged {},{}", context == currFlowContext, context.state()); + if (context == currFlowContext) { + if (context.state() == HttpFlowContext.STATE_CONNECTED + || context.state() == HttpFlowContext.STATE_RESP_RECEIVED + || context.state() == HttpFlowContext.STATE_ERROR + || context.state() == HttpFlowContext.STATE_DISCONNECTED) { currFlowContext = null; } } @@ -64,14 +78,6 @@ } @Override - public void errorOccured(ChannelContext context, Throwable cause) throws Exception { - synchronized (lock) { - currFlowContext = null; - super.errorOccured(context, cause); - } - } - - @Override protected void closeAllConnections() { synchronized (lock) { boolean wait; @@ -93,40 +99,43 @@ } while (wait); super.closeAllConnections(); - do { + while (!sessions.isEmpty()) { try { lock.wait(100); } catch (Exception e) { } - } while (!sessions.isEmpty()); + } } } private boolean pollNext() { - Event event = eventsQueue.peek(); + if (currFlowContext != null) { + return false; + } + + Event event = eventsQueue.poll(); if (event != null) { sleep(event); + if (logger.isTraceEnabled()) { + logger.trace("Event processing: {}", event); + } + if (event instanceof SessionEvent) { if (event.getType() == SessionStatusEvent.HTTP_SESSION_STATUS) { - eventsQueue.poll(); SessionStatusEvent statusEvent = (SessionStatusEvent) event; if (statusEvent.getStatus() == SessionStatusEvent.STATUS_ESTABLISHED) { try { - SessionInfo session = statusEvent.getSessionInfo(); - currFlowContext = register(session); - emitter.connect(session, this, index); + currFlowContext = connect(statusEvent); } catch (Exception e) { logger.error(e.getMessage(), e); } - return false; + return (currFlowContext == null); } else if (statusEvent.getStatus() == SessionStatusEvent.STATUS_CLOSED) { currFlowContext = flowContext((SessionEvent) event); if (currFlowContext != null) { if (currFlowContext.state != HttpFlowContext.STATE_REQ_SENT) { - SessionInfo session = statusEvent.getSessionInfo(); - close(session); - currFlowContext = null; + close(statusEvent); } } } @@ -134,22 +143,25 @@ return true; } else if (event.getType() == HttpSessionPayloadEvent.HTTP_SESSION_PAYLOAD) { SessionEvent sessEvent = (SessionEvent) event; - currFlowContext = flowContext(sessEvent); - if (currFlowContext != null) { - switch (currFlowContext.state) { + HttpFlowContext flowContext = flowContext(sessEvent); + if (flowContext != null) { + switch (flowContext.state) { case HttpFlowContext.STATE_CONNECTED: case HttpFlowContext.STATE_RESP_RECEIVED: case HttpFlowContext.STATE_ERROR: - eventsQueue.poll(); - send(currFlowContext, (HttpSessionPayloadEvent) event); - return false; + currFlowContext = flowContext; + if (send(flowContext, (HttpSessionPayloadEvent) event)) { + return false; + } else { + currFlowContext = null; + return true; + } case HttpFlowContext.STATE_DISCONNECTING: case HttpFlowContext.STATE_DISCONNECTED: if (connectPartialSession) { currFlowContext = connect(sessEvent); return false; } else { - eventsQueue.poll(); return true; } default: @@ -157,37 +169,35 @@ } } else if (connectPartialSession) { currFlowContext = connect(sessEvent); - return false; - } else { - eventsQueue.poll(); - return true; + if (currFlowContext != null) { + eventsQueue.addFirst(sessEvent); + return false; + } else { + return true; + } } + + return true; } else { - eventsQueue.poll(); return true; } - } else { - eventsQueue.poll(); - if (event.getType() == DataLoopEnd.TYPE) { - if (logger.isDebugEnabled()) { - logger.debug("DataLoopEnd received."); - } + } else if (event.getType() == DataLoopEnd.TYPE) { + if (logger.isDebugEnabled()) { + logger.debug("DataLoopEnd received."); + } - loopEnd = true; - closeAllConnections(); - filterChain.reset(); - loopEnd = false; - return true; - } else if (event.getType() == DataEnd.TYPE) { - if (logger.isDebugEnabled()) { - logger.debug("DataEnd received. Deactivation."); - } + loopEnd = true; + closeAllConnections(); + filterChain.reset(); + loopEnd = false; + return true; + } else if (event.getType() == DataEnd.TYPE) { + if (logger.isDebugEnabled()) { + logger.debug("DataEnd received. Deactivation."); + } - working = false; - } + working = false; } - } else { - currFlowContext = null; } return false; @@ -204,19 +214,14 @@ } catch (InterruptedException ignore) { } - boolean pollNext = false; + boolean nextPoll = true; do { if (loopEnd) { break; } - if (currFlowContext == null) { - pollNext = pollNext(); - } else if (currFlowContext.state != HttpFlowContext.STATE_CONNECTING - && currFlowContext.state != HttpFlowContext.STATE_REQ_SENT) { - pollNext = pollNext(); - } - } while (pollNext); + nextPoll = pollNext(); + } while (nextPoll); } catch (Exception e) { if (logger.isDebugEnabled()) { logger.debug(e.getMessage(), e);
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java Wed Aug 23 10:11:02 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java Fri Aug 25 08:46:27 2017 +0200 @@ -6,11 +6,12 @@ import com.passus.st.emitter.ChannelContext; import com.passus.st.emitter.SessionInfo; import java.io.IOException; -import java.nio.Buffer; import java.nio.ByteBuffer; -import java.nio.channels.Pipe; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; /** * @@ -24,7 +25,7 @@ private final SocketChannel channel; - private final Pipe pipe; + private final Queue<ByteBuffer> dataQueue; private SocketAddress localAddress; @@ -32,24 +33,28 @@ private SelectionKey key; - public NioChannelContext(NioEmitterWorker worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo, Pipe pipe) { + public static AtomicInteger writes = new AtomicInteger(); + + public NioChannelContext(NioEmitterWorker worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) { this.worker = worker; this.channel = channel; this.remoteAddress = remoteAddress; this.sessionInfo = sessionInfo; - this.pipe = pipe; + this.dataQueue = new LinkedList<>(); + } - Pipe pipe() { - return pipe; + Queue<ByteBuffer> dataQueue() { + return dataQueue; } void selectionKey(SelectionKey key) { this.key = key; } - private void writeToPipe(ByteBuffer buffer) throws IOException { - pipe.sink().write(buffer); + private void addToQeueu(ByteBuffer buffer) throws IOException { + dataQueue.add(buffer); + writes.incrementAndGet(); } @Override @@ -64,17 +69,17 @@ @Override public void write(byte[] data, int offset, int length) throws IOException { - writeToPipe(ByteBuffer.wrap(data, offset, length)); + addToQeueu(ByteBuffer.wrap(data, offset, length)); } @Override public void write(ByteBuff data) throws IOException { - writeToPipe(data.toNioByteBuffer()); + addToQeueu(data.toNioByteBuffer()); } @Override public void flush() { - NioEmitterWorker.setOpWrite(key); + worker.flush(key); } @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java Wed Aug 23 10:11:02 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java Fri Aug 25 08:46:27 2017 +0200 @@ -16,7 +16,6 @@ import java.io.IOException; import java.net.ConnectException; import java.nio.ByteBuffer; -import java.nio.channels.Pipe; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; @@ -141,7 +140,12 @@ } public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException { - tasks.add(new TaskConnect(sessionInfo, handler)); + tasks.add(new ConnectTask(sessionInfo, handler)); + selector.wakeup(); + } + + void flush(SelectionKey key) { + tasks.add(new FlushTask(key)); selector.wakeup(); } @@ -174,13 +178,12 @@ channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress)); } - Pipe pipe = Pipe.open(); SocketAddress remoteAddress = connParams.getRemoteAddress(); if (remoteAddress == null) { remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); } - NioChannelContext channelContext = new NioChannelContext(this, channel, remoteAddress, sessionInfo, pipe); + NioChannelContext channelContext = new NioChannelContext(this, channel, remoteAddress, sessionInfo); KeyContext keyContext = new KeyContext(channelContext, handler); SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext); try { @@ -229,7 +232,7 @@ if (timeouted) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Connection to '" + keyContext.channelContext.getRemoteAddress() + "' timed out."); + LOGGER.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress()); } throw new ConnectException("Connection timed out."); @@ -251,12 +254,12 @@ metric.addBindSocket(keyContext.channelContext.getLocalAddress()); } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress()); + } + keyContext.handler.channelActive(keyContext.channelContext); setOpRead(key); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Connected to '" + keyContext.channelContext.getRemoteAddress() + "'."); - } } catch (Exception ex) { LOGGER.error(ex.getMessage(), ex); } @@ -265,27 +268,19 @@ private void doWrite(SelectionKey key) { SocketChannel socketChannel = (SocketChannel) key.channel(); KeyContext keyContext = (KeyContext) key.attachment(); - ByteBuffer buffer = keyContext.buffer; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Writing to '" + keyContext.channelContext.getRemoteAddress() + "'."); + LOGGER.debug("Writing to '{}'.", keyContext.channelContext.getRemoteAddress()); } - keyContext.buffer.clear(); - - Pipe.SourceChannel source = keyContext.channelContext.pipe().source(); - try { - source.configureBlocking(false); - } catch (Exception e) { - } - - int readed; + Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue(); int written = 0; try { - while ((readed = source.read(buffer)) > 0) { - buffer.flip(); + ByteBuffer buffer; + while (!queue.isEmpty()) { + buffer = queue.poll(); while (buffer.hasRemaining()) { int res = socketChannel.write(buffer); + if (res == -1) { doClose(key); return; @@ -297,8 +292,6 @@ written += res; } - - buffer.clear(); } } catch (Exception e) { doCatchException(key, e); @@ -312,6 +305,7 @@ //TODO Operacje na handlerach powinny przechodzic przez Executor try { + NioChannelContext.writes.decrementAndGet(); keyContext.handler.dataWritten(keyContext.channelContext); } catch (Exception e) { LOGGER.debug(e.getMessage(), e); @@ -467,7 +461,7 @@ } void requestClose(SelectionKey key) { - tasks.add(new TaskClose(key)); + tasks.add(new CloseTask(key)); key.selector().wakeup(); } @@ -476,24 +470,27 @@ int selected = 0; working = true; while (working) { + if (!tasks.isEmpty()) { + Task task; + while ((task = tasks.poll()) != null) { + if (task.code == Task.CLOSE) { + doClose(((CloseTask) task).key); + } else if (task.code == Task.CONNECT) { + ConnectTask taskConn = (ConnectTask) task; + doConnect(taskConn.sessionInfo, taskConn.handler); + } else if (task.code == Task.FLUSH) { + FlushTask flushTask = (FlushTask) task; + setOpWrite(flushTask.key); + } + } + } + try { selected = selector.select(selectTimeout); } catch (IOException ex) { LOGGER.warn(ex.getMessage(), ex); } - if (!tasks.isEmpty()) { - Task task; - while ((task = tasks.poll()) != null) { - if (task.code == Task.CLOSE) { - doClose(((TaskClose) task).key); - } else if (task.code == Task.CONNECT) { - TaskConnect taskConn = (TaskConnect) task; - doConnect(taskConn.sessionInfo, taskConn.handler); - } - } - } - if (selected > 0) { Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { @@ -535,6 +532,7 @@ public static final int CLOSE = 1; public static final int CONNECT = 2; + public static final int FLUSH = 3; private final int code; @@ -544,12 +542,12 @@ } - private static class TaskConnect extends Task { + private final static class ConnectTask extends Task { private final SessionInfo sessionInfo; private final EmitterHandler handler; - public TaskConnect(SessionInfo sessionInfo, EmitterHandler handler) { + public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) { super(CONNECT); this.sessionInfo = sessionInfo; this.handler = handler; @@ -557,14 +555,25 @@ } - private static class TaskClose extends Task { + private final static class CloseTask extends Task { private final SelectionKey key; - public TaskClose(SelectionKey key) { + public CloseTask(SelectionKey key) { super(CLOSE); this.key = key; } } + + private final static class FlushTask extends Task { + + private final SelectionKey key; + + public FlushTask(SelectionKey key) { + super(FLUSH); + this.key = key; + } + + } }
--- a/stress-tester/src/test/java/com/passus/st/client/http/HttpClientTest.java Wed Aug 23 10:11:02 2017 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/http/HttpClientTest.java Fri Aug 25 08:46:27 2017 +0200 @@ -10,7 +10,6 @@ import com.passus.st.client.SessionEvent; import com.passus.st.client.SessionStatusEvent; import com.passus.st.client.TestHttpClientListener; -import com.passus.st.client.TestHttpClientListener.HttpClientEventType; import com.passus.st.client.TestHttpClientListener.ResponseReceivedEvent; import com.passus.st.utils.EventUtils; import java.util.List; @@ -19,6 +18,7 @@ import org.testng.annotations.Test; import static com.github.tomakehurst.wiremock.client.WireMock.post; import com.passus.commons.service.ServiceUtils; +import java.util.LinkedList; import org.testng.annotations.BeforeMethod; /** @@ -56,7 +56,6 @@ NioEmitter emitter = prepareEmitter("10.87.110.40:4214 -> " + HOST + ":" + PORT); emitter.start(); - TestHttpClientListener listner = new TestHttpClientListener(); HttpClient client = new HttpClient(emitter); @@ -83,7 +82,7 @@ } } - @Test + @Test(enabled = true) public void testHandle_ConnectPartialSession() throws Exception { Properties props = new Properties(); props.put("allowPartialSession", "true"); @@ -107,7 +106,6 @@ client.join(); assertTrue(listner.size() > 0); - assertTrue(listner.size() > 0); assertTrue(listner.get(0) instanceof ResponseReceivedEvent); } finally { ServiceUtils.stopQuietly(client); @@ -115,4 +113,47 @@ } } + @Test(enabled = true) + public void testHandle_ThreeLoops() throws Exception { + Properties props = new Properties(); + props.put("allowPartialSession", "true"); + props.put("ports", "4214"); + LinkedList<Event> events = new LinkedList<>(EventUtils.readEvents("pcap/http/http_req_resp.pcap", props)); + assertEquals(4, events.size()); + Event dataEnd = events.removeLast(); //Usuwamy DataEnd + + SessionEvent sessionEvent = (SessionEvent) events.get(0); + events.addFirst(new SessionStatusEvent(sessionEvent.getSessionInfo(), SessionStatusEvent.STATUS_ESTABLISHED)); + + NioEmitter emitter = prepareEmitter("10.87.110.40:4214 -> " + HOST + ":" + PORT); + emitter.start(); + TestHttpClientListener listner = new TestHttpClientListener(); + + HttpClient client = new HttpClient(emitter); + try { + client.addListener(listner); + client.start(); + + for (int i = 0; i < 3; i++) { + events.forEach((event) -> { + client.handle(event); + }); + } + client.handle(dataEnd); + client.join(); + + assertEquals(3, listner.size()); + + for (int i = 0; i < 3; i++) { + assertTrue(listner.get(i) instanceof ResponseReceivedEvent); + ResponseReceivedEvent event = (ResponseReceivedEvent) listner.get(i); + String responseStr = event.getResponse().toString(); + assertTrue(responseStr.startsWith("HTTP/1.1 200 OK")); + assertTrue(responseStr.endsWith("test")); + } + } finally { + ServiceUtils.stopQuietly(client); + ServiceUtils.stopQuietly(emitter); + } + } }
--- a/stress-tester/src/test/java/com/passus/st/client/http/filter/HttpMessageModificationFilterTest.java Wed Aug 23 10:11:02 2017 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/http/filter/HttpMessageModificationFilterTest.java Fri Aug 25 08:46:27 2017 +0200 @@ -139,7 +139,7 @@ } - @Test + @Test(enabled = true) public void testComplexExpression() throws Exception { String filterConfig = "filters:\n" + " - type: modifyMessage\n"