Mercurial > stress-tester
changeset 813:3a7411ab09e9
HttpSynchClientWorkerTest.testHandle_SimpleRequestResponse
author | Devel 2 |
---|---|
date | Mon, 15 Jan 2018 12:26:11 +0100 |
parents | 43eb11e7f86b |
children | f27093f7af45 |
files | stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java stress-tester/src/test/java/com/passus/st/client/http/HttpSynchClientWorkerTest.java stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java stress-tester/src/test/java/com/passus/st/utils/Assert.java |
diffstat | 5 files changed, 171 insertions(+), 148 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorker.java Mon Jan 15 09:23:05 2018 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorker.java Mon Jan 15 12:26:11 2018 +0100 @@ -85,6 +85,16 @@ this.listeners.addAll(listeners); } + public void addListener(HttpClientListener listener) { + Assert.notNull(listener, "listener"); + this.listeners.add(listener); + } + + public void removeListener(HttpClientListener listener) { + Assert.notNull(listener, "listener"); + this.listeners.remove(listener); + } + @Override public boolean isCollectMetrics() { return metric != null;
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java Mon Jan 15 09:23:05 2018 +0100 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java Mon Jan 15 12:26:11 2018 +0100 @@ -133,7 +133,7 @@ /** * Returns true if next event should be processed immediately. - * @return + * @return boolean */ private boolean pollNext() { if (currFlowContext != null) {
--- a/stress-tester/src/test/java/com/passus/st/client/http/HttpSynchClientWorkerTest.java Mon Jan 15 09:23:05 2018 +0100 +++ b/stress-tester/src/test/java/com/passus/st/client/http/HttpSynchClientWorkerTest.java Mon Jan 15 12:26:11 2018 +0100 @@ -3,10 +3,10 @@ import com.passus.config.Configuration; import com.passus.data.ByteBuff; import com.passus.data.HeapByteBuff; +import com.passus.net.SocketAddress; import com.passus.net.http.HttpRequest; import com.passus.net.http.HttpResponse; import com.passus.net.http.HttpResponseEncoder; -import com.passus.st.Log4jConfigurationFactory; import com.passus.st.client.Event; import com.passus.st.client.SessionEvent; import com.passus.st.client.SessionStatusEvent; @@ -17,77 +17,31 @@ import com.passus.st.emitter.PassThroughSessionMapper; import com.passus.st.emitter.SessionInfo; import com.passus.st.emitter.SessionMapper; -import com.passus.st.emitter.nio.NioChannelContext; -import com.passus.st.emitter.nio.NioEmitterWorker; import com.passus.st.metric.MetricsContainer; import com.passus.st.utils.EventUtils; import java.io.IOException; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.util.Arrays; +import java.nio.ByteBuffer; +import java.util.LinkedList; import java.util.List; import java.util.Properties; -import org.apache.logging.log4j.Level; -import static org.testng.AssertJUnit.*; +import java.util.Queue; import org.testng.annotations.Test; +import static com.passus.st.utils.Assert.*; +import org.testng.annotations.AfterMethod; /** * * @author mikolaj.podbielski + * @author miroslaw.hawrot */ public class HttpSynchClientWorkerTest { - private static class LocalEmitterWorker extends NioEmitterWorker { - - public LocalEmitterWorker(int index) throws IOException { - super(index); - } - - @Override - public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - @Override - public void setWorking(boolean b) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - @Override - protected void flush(SelectionKey key) { - KeyContext keyContext = (KeyContext) key.attachment(); - SessionInfo sessionInfo = keyContext.channelContext.getSessionInfo(); - HttpFlowBasedClientWorker clientWorker = (HttpFlowBasedClientWorker) keyContext.handler; - HttpFlowContext flowContext = clientWorker.flowContext(sessionInfo); - HttpSessionPayloadEvent event = flowContext.sentEvent; - HttpRequest request = event.getRequest(); - HttpResponse response = event.getResponse(); - ByteBuff buff = new HeapByteBuff(); - HttpResponseEncoder.INSTANCE.encode(response, buff); - try { - keyContext.handler.dataReceived(keyContext.channelContext, buff); - } catch (Exception ex) { - ex.printStackTrace(); - } - System.out.println("flush"); - } - - @Override - protected void requestClose(SelectionKey key) { - System.out.println("requestClose"); - } - - @Override - public void writeMetrics(MetricsContainer container) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - } + private final TestHttpClientListener listner = new TestHttpClientListener(); private static class LocalEmitter implements Emitter { private SessionMapper sessionMapper = new PassThroughSessionMapper(); + private boolean started = false; @Override @@ -102,20 +56,36 @@ @Override public void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException { - System.out.println("Emitter connect"); - System.out.println(session); - System.out.println(handler); - // doConnect -> channelRegistered - // selector - // doFinishConnect -> channelActive - NioEmitterWorker worker = new LocalEmitterWorker(workerIndex); - NioChannelContext channelContext = new NioChannelContext(worker, null, null, session); + LocalChannelContext channelContext = new LocalChannelContext(this, handler, null, session); try { handler.channelRegistered(channelContext); handler.channelActive(channelContext); - KeyContext keyContext = new KeyContext(channelContext, handler); - SelectionKey key = selectionKey(keyContext); - channelContext.selectionKey(key); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + + protected void flush(LocalChannelContext channelContext) { + SessionInfo sessionInfo = channelContext.getSessionInfo(); + HttpFlowBasedClientWorker clientWorker = (HttpFlowBasedClientWorker) channelContext.handler; + HttpFlowContext flowContext = clientWorker.flowContext(sessionInfo); + HttpSessionPayloadEvent event = flowContext.sentEvent; + + HttpRequest request = event.getRequest(); + HttpResponse response = event.getResponse(); + ByteBuff buff = new HeapByteBuff(); + HttpResponseEncoder.INSTANCE.encode(response, buff); + try { + clientWorker.dataReceived(channelContext, buff); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + + protected void close(LocalChannelContext channelContext) { + try { + channelContext.handler.channelInactive(channelContext); + channelContext.handler.channelUnregistered(channelContext); } catch (Exception ex) { ex.printStackTrace(); } @@ -154,35 +124,95 @@ } } - private static class KeyContext { + private static class LocalChannelContext implements ChannelContext { + + private final LocalEmitter emitter; private final EmitterHandler handler; - private final ChannelContext channelContext; + private final SessionInfo sessionInfo; - public KeyContext(ChannelContext channelContext, EmitterHandler handler) { - this.channelContext = channelContext; + private final Queue<ByteBuffer> dataQueue; + + private SocketAddress localAddress; + + private SocketAddress remoteAddress; + + public LocalChannelContext(LocalEmitter emitter, EmitterHandler handler, SocketAddress remoteAddress, SessionInfo sessionInfo) { + this.emitter = emitter; this.handler = handler; + this.remoteAddress = remoteAddress; + this.sessionInfo = sessionInfo; + this.dataQueue = new LinkedList<>(); + } + + @Override + public boolean isConnected() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public boolean isConnectionPending() { + throw new UnsupportedOperationException("Not supported yet."); + } + + private void addToQeueu(ByteBuffer buffer) throws IOException { + dataQueue.add(buffer); + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + addToQeueu(ByteBuffer.wrap(data, offset, length)); + } + + @Override + public void write(ByteBuff data) throws IOException { + addToQeueu(data.toNioByteBuffer()); + } + + @Override + public void flush() throws IOException { + emitter.flush(this); + } + + @Override + public void close() throws IOException { + emitter.close(this); + } + + @Override + public SocketAddress getLocalAddress() { + return localAddress; + } + + @Override + public SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public SessionInfo getSessionInfo() { + return sessionInfo; } } - // wisi w pętli pollNext - @Test(enabled = false) - public void testHandle1() throws Exception { - Log4jConfigurationFactory.enableFactory(Level.DEBUG); + @AfterMethod + public void afterMethod() { + listner.clear(); + } + @Test(enabled = true) + public void testHandle_SimpleRequestResponse() throws Exception { Properties props = new Properties(); props.put("allowPartialSession", "true"); props.put("ports", "4214"); List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props); assertEquals(4, events.size()); - TestHttpClientListener listner = new TestHttpClientListener(); LocalEmitter emitter = new LocalEmitter(); HttpSynchClientWorker worker = new HttpSynchClientWorker(emitter, "test", 0); - worker.setListeners(Arrays.asList(listner)); - //worker.setConnectPartialSession(true); + worker.addListener(listner); worker.start(); SessionEvent sessionEvent = (SessionEvent) events.get(0); @@ -190,51 +220,7 @@ events.forEach(worker::handle); worker.join(); - System.out.println(""); - } - - private static SelectionKey selectionKey(Object attachment) { - SelectionKey key = new XSelectionKey(); - key.attach(attachment); - return key; + assertHttpClientEvents(events, listner.events()); } - private static class XSelectionKey extends SelectionKey { - - @Override - public SelectableChannel channel() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - @Override - public Selector selector() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - @Override - public boolean isValid() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - @Override - public void cancel() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - @Override - public int interestOps() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - @Override - public SelectionKey interestOps(int ops) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - @Override - public int readyOps() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - } }
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java Mon Jan 15 09:23:05 2018 +0100 +++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java Mon Jan 15 12:26:11 2018 +0100 @@ -4,15 +4,12 @@ import com.passus.data.PooledAllocator; import com.passus.st.client.ArrayListEventHandler; import com.passus.st.client.Event; -import com.passus.st.client.http.HttpSessionPayloadEvent; import com.passus.st.utils.EventUtils; -import static com.passus.st.utils.HttpMessageAssert.assertMessages; -import static com.passus.st.utils.HttpMessageAssert.assertMessagesContent; import java.io.File; import java.io.IOException; import java.util.List; import java.util.Properties; -import static org.testng.AssertJUnit.*; +import static com.passus.st.utils.Assert.*; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -48,25 +45,6 @@ }; } - private void assertEvents(FileEvents expectedEvents, List<Event> events) { - assertEquals(expectedEvents.events.size(), events.size()); - for (int i = 0; i < events.size(); i++) { - Event expectedEvent = expectedEvents.events.get(i); - Event event = events.get(i); - - assertEquals(expectedEvent.getType(), event.getType()); - if (event.getType() == HttpSessionPayloadEvent.TYPE) { - HttpSessionPayloadEvent expectedPayloadEvent = (HttpSessionPayloadEvent) expectedEvent; - HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event; - - assertMessages(expectedPayloadEvent.getRequest(), payloadEvent.getRequest()); - assertMessagesContent(expectedPayloadEvent.getRequest(), payloadEvent.getRequest()); - assertMessages(expectedPayloadEvent.getResponse(), payloadEvent.getResponse()); - assertMessagesContent(expectedPayloadEvent.getResponse(), payloadEvent.getResponse()); - } - } - } - @Test(dataProvider = "pcapFiles") public void testRead(String pcapFile) throws Exception { FileEvents fileEvents = writeEvents(pcapFile); @@ -78,7 +56,7 @@ eventSource.start(); List<Event> events = handler.getEvents(); - assertEvents(fileEvents, events); + assertEvents(fileEvents.events, events); } finally { fileEvents.ncFile.delete(); } @@ -97,7 +75,7 @@ eventSource.start(); List<Event> events = handler.getEvents(); - assertEvents(fileEvents, events); + assertEvents(fileEvents.events, events); assertEquals(2, allocator.usedSize()); } finally { fileEvents.ncFile.delete();
--- a/stress-tester/src/test/java/com/passus/st/utils/Assert.java Mon Jan 15 09:23:05 2018 +0100 +++ b/stress-tester/src/test/java/com/passus/st/utils/Assert.java Mon Jan 15 12:26:11 2018 +0100 @@ -3,6 +3,19 @@ import com.passus.data.ByteString; import com.passus.data.ByteStringImpl; import com.passus.data.SliceByteString; +import com.passus.st.client.Event; +import com.passus.st.client.TestHttpClientListener; +import com.passus.st.client.TestHttpClientListener.HttpClientEvent; +import com.passus.st.client.TestHttpClientListener.HttpClientEventType; +import com.passus.st.client.TestHttpClientListener.ResponseReceivedEvent; +import com.passus.st.client.http.HttpSessionPayloadEvent; +import com.passus.st.source.NcEventSourceTest; +import static com.passus.st.utils.HttpMessageAssert.assertMessages; +import static com.passus.st.utils.HttpMessageAssert.assertMessagesContent; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; import org.testng.AssertJUnit; import static org.testng.AssertJUnit.assertEquals; @@ -27,4 +40,40 @@ assertEquals(null, expectedBs, actual); } + public static void assertHttpClientEvents(List<Event> expectedEvents, Collection<HttpClientEvent> httpClientEvents) { + expectedEvents = expectedEvents.stream() + .filter((e) -> e.getType() == HttpSessionPayloadEvent.TYPE) + .collect(Collectors.toList()); + + List<Event> events = new ArrayList<>(httpClientEvents.size()); + httpClientEvents.forEach((event) -> { + if (event.getType() == HttpClientEventType.RESPONSE_RECEIVED) { + ResponseReceivedEvent respReceived = (ResponseReceivedEvent) event; + HttpSessionPayloadEvent payloadEvent = new HttpSessionPayloadEvent(null, respReceived.getRequest(), respReceived.getResponse(), null); + events.add(payloadEvent); + } + }); + + assertEvents(expectedEvents, events); + } + + public static void assertEvents(List<Event> expectedEvents, List<Event> events) { + assertEquals(expectedEvents.size(), events.size()); + for (int i = 0; i < events.size(); i++) { + Event expectedEvent = expectedEvents.get(i); + Event event = events.get(i); + + assertEquals(expectedEvent.getType(), event.getType()); + if (event.getType() == HttpSessionPayloadEvent.TYPE) { + HttpSessionPayloadEvent expectedPayloadEvent = (HttpSessionPayloadEvent) expectedEvent; + HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event; + + assertMessages(expectedPayloadEvent.getRequest(), payloadEvent.getRequest()); + assertMessagesContent(expectedPayloadEvent.getRequest(), payloadEvent.getRequest()); + assertMessages(expectedPayloadEvent.getResponse(), payloadEvent.getResponse()); + assertMessagesContent(expectedPayloadEvent.getResponse(), payloadEvent.getResponse()); + } + } + } + }