changeset 813:3a7411ab09e9

HttpSynchClientWorkerTest.testHandle_SimpleRequestResponse
author Devel 2
date Mon, 15 Jan 2018 12:26:11 +0100
parents 43eb11e7f86b
children f27093f7af45
files stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java stress-tester/src/test/java/com/passus/st/client/http/HttpSynchClientWorkerTest.java stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java stress-tester/src/test/java/com/passus/st/utils/Assert.java
diffstat 5 files changed, 171 insertions(+), 148 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorker.java	Mon Jan 15 09:23:05 2018 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorker.java	Mon Jan 15 12:26:11 2018 +0100
@@ -85,6 +85,16 @@
         this.listeners.addAll(listeners);
     }
 
+    public void addListener(HttpClientListener listener) {
+        Assert.notNull(listener, "listener");
+        this.listeners.add(listener);
+    }
+
+    public void removeListener(HttpClientListener listener) {
+        Assert.notNull(listener, "listener");
+        this.listeners.remove(listener);
+    }
+    
     @Override
     public boolean isCollectMetrics() {
         return metric != null;
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java	Mon Jan 15 09:23:05 2018 +0100
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java	Mon Jan 15 12:26:11 2018 +0100
@@ -133,7 +133,7 @@
 
     /**
      * Returns true if next event should be processed immediately.
-     * @return 
+     * @return boolean
      */
     private boolean pollNext() {
         if (currFlowContext != null) {
--- a/stress-tester/src/test/java/com/passus/st/client/http/HttpSynchClientWorkerTest.java	Mon Jan 15 09:23:05 2018 +0100
+++ b/stress-tester/src/test/java/com/passus/st/client/http/HttpSynchClientWorkerTest.java	Mon Jan 15 12:26:11 2018 +0100
@@ -3,10 +3,10 @@
 import com.passus.config.Configuration;
 import com.passus.data.ByteBuff;
 import com.passus.data.HeapByteBuff;
+import com.passus.net.SocketAddress;
 import com.passus.net.http.HttpRequest;
 import com.passus.net.http.HttpResponse;
 import com.passus.net.http.HttpResponseEncoder;
-import com.passus.st.Log4jConfigurationFactory;
 import com.passus.st.client.Event;
 import com.passus.st.client.SessionEvent;
 import com.passus.st.client.SessionStatusEvent;
@@ -17,77 +17,31 @@
 import com.passus.st.emitter.PassThroughSessionMapper;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.emitter.SessionMapper;
-import com.passus.st.emitter.nio.NioChannelContext;
-import com.passus.st.emitter.nio.NioEmitterWorker;
 import com.passus.st.metric.MetricsContainer;
 import com.passus.st.utils.EventUtils;
 import java.io.IOException;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.Arrays;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
-import org.apache.logging.log4j.Level;
-import static org.testng.AssertJUnit.*;
+import java.util.Queue;
 import org.testng.annotations.Test;
+import static com.passus.st.utils.Assert.*;
+import org.testng.annotations.AfterMethod;
 
 /**
  *
  * @author mikolaj.podbielski
+ * @author miroslaw.hawrot
  */
 public class HttpSynchClientWorkerTest {
 
-    private static class LocalEmitterWorker extends NioEmitterWorker {
-
-        public LocalEmitterWorker(int index) throws IOException {
-            super(index);
-        }
-
-        @Override
-        public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException {
-            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
-        }
-
-        @Override
-        public void setWorking(boolean b) {
-            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
-        }
-
-        @Override
-        protected void flush(SelectionKey key) {
-            KeyContext keyContext = (KeyContext) key.attachment();
-            SessionInfo sessionInfo = keyContext.channelContext.getSessionInfo();
-            HttpFlowBasedClientWorker clientWorker = (HttpFlowBasedClientWorker) keyContext.handler;
-            HttpFlowContext flowContext = clientWorker.flowContext(sessionInfo);
-            HttpSessionPayloadEvent event = flowContext.sentEvent;
-            HttpRequest request = event.getRequest();
-            HttpResponse response = event.getResponse();
-            ByteBuff buff = new HeapByteBuff();
-            HttpResponseEncoder.INSTANCE.encode(response, buff);
-            try {
-                keyContext.handler.dataReceived(keyContext.channelContext, buff);
-            } catch (Exception ex) {
-                ex.printStackTrace();
-            }
-            System.out.println("flush");
-        }
-
-        @Override
-        protected void requestClose(SelectionKey key) {
-            System.out.println("requestClose");
-        }
-
-        @Override
-        public void writeMetrics(MetricsContainer container) {
-            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
-        }
-
-    }
+    private final TestHttpClientListener listner = new TestHttpClientListener();
 
     private static class LocalEmitter implements Emitter {
 
         private SessionMapper sessionMapper = new PassThroughSessionMapper();
+
         private boolean started = false;
 
         @Override
@@ -102,20 +56,36 @@
 
         @Override
         public void connect(SessionInfo session, EmitterHandler handler, int workerIndex) throws IOException {
-            System.out.println("Emitter connect");
-            System.out.println(session);
-            System.out.println(handler);
-            // doConnect -> channelRegistered
-            // selector
-            // doFinishConnect -> channelActive
-            NioEmitterWorker worker = new LocalEmitterWorker(workerIndex);
-            NioChannelContext channelContext = new NioChannelContext(worker, null, null, session);
+            LocalChannelContext channelContext = new LocalChannelContext(this, handler, null, session);
             try {
                 handler.channelRegistered(channelContext);
                 handler.channelActive(channelContext);
-                KeyContext keyContext = new KeyContext(channelContext, handler);
-                SelectionKey key = selectionKey(keyContext);
-                channelContext.selectionKey(key);
+            } catch (Exception ex) {
+                ex.printStackTrace();
+            }
+        }
+
+        protected void flush(LocalChannelContext channelContext) {
+            SessionInfo sessionInfo = channelContext.getSessionInfo();
+            HttpFlowBasedClientWorker clientWorker = (HttpFlowBasedClientWorker) channelContext.handler;
+            HttpFlowContext flowContext = clientWorker.flowContext(sessionInfo);
+            HttpSessionPayloadEvent event = flowContext.sentEvent;
+
+            HttpRequest request = event.getRequest();
+            HttpResponse response = event.getResponse();
+            ByteBuff buff = new HeapByteBuff();
+            HttpResponseEncoder.INSTANCE.encode(response, buff);
+            try {
+                clientWorker.dataReceived(channelContext, buff);
+            } catch (Exception ex) {
+                ex.printStackTrace();
+            }
+        }
+
+        protected void close(LocalChannelContext channelContext) {
+            try {
+                channelContext.handler.channelInactive(channelContext);
+                channelContext.handler.channelUnregistered(channelContext);
             } catch (Exception ex) {
                 ex.printStackTrace();
             }
@@ -154,35 +124,95 @@
         }
     }
 
-    private static class KeyContext {
+    private static class LocalChannelContext implements ChannelContext {
+
+        private final LocalEmitter emitter;
 
         private final EmitterHandler handler;
 
-        private final ChannelContext channelContext;
+        private final SessionInfo sessionInfo;
 
-        public KeyContext(ChannelContext channelContext, EmitterHandler handler) {
-            this.channelContext = channelContext;
+        private final Queue<ByteBuffer> dataQueue;
+
+        private SocketAddress localAddress;
+
+        private SocketAddress remoteAddress;
+
+        public LocalChannelContext(LocalEmitter emitter, EmitterHandler handler, SocketAddress remoteAddress, SessionInfo sessionInfo) {
+            this.emitter = emitter;
             this.handler = handler;
+            this.remoteAddress = remoteAddress;
+            this.sessionInfo = sessionInfo;
+            this.dataQueue = new LinkedList<>();
+        }
+
+        @Override
+        public boolean isConnected() {
+            throw new UnsupportedOperationException("Not supported yet.");
+        }
+
+        @Override
+        public boolean isConnectionPending() {
+            throw new UnsupportedOperationException("Not supported yet.");
+        }
+
+        private void addToQeueu(ByteBuffer buffer) throws IOException {
+            dataQueue.add(buffer);
+        }
+
+        @Override
+        public void write(byte[] data, int offset, int length) throws IOException {
+            addToQeueu(ByteBuffer.wrap(data, offset, length));
+        }
+
+        @Override
+        public void write(ByteBuff data) throws IOException {
+            addToQeueu(data.toNioByteBuffer());
+        }
+
+        @Override
+        public void flush() throws IOException {
+            emitter.flush(this);
+        }
+
+        @Override
+        public void close() throws IOException {
+            emitter.close(this);
+        }
+
+        @Override
+        public SocketAddress getLocalAddress() {
+            return localAddress;
+        }
+
+        @Override
+        public SocketAddress getRemoteAddress() {
+            return remoteAddress;
+        }
+
+        @Override
+        public SessionInfo getSessionInfo() {
+            return sessionInfo;
         }
 
     }
 
-    // wisi w pętli pollNext
-    @Test(enabled = false)
-    public void testHandle1() throws Exception {
-        Log4jConfigurationFactory.enableFactory(Level.DEBUG);
+    @AfterMethod
+    public void afterMethod() {
+        listner.clear();
+    }
 
+    @Test(enabled = true)
+    public void testHandle_SimpleRequestResponse() throws Exception {
         Properties props = new Properties();
         props.put("allowPartialSession", "true");
         props.put("ports", "4214");
         List<Event> events = EventUtils.readEvents("pcap/http/http_req_resp.pcap", props);
         assertEquals(4, events.size());
 
-        TestHttpClientListener listner = new TestHttpClientListener();
         LocalEmitter emitter = new LocalEmitter();
         HttpSynchClientWorker worker = new HttpSynchClientWorker(emitter, "test", 0);
-        worker.setListeners(Arrays.asList(listner));
-        //worker.setConnectPartialSession(true);
+        worker.addListener(listner);
 
         worker.start();
         SessionEvent sessionEvent = (SessionEvent) events.get(0);
@@ -190,51 +220,7 @@
         events.forEach(worker::handle);
         worker.join();
 
-        System.out.println("");
-    }
-
-    private static SelectionKey selectionKey(Object attachment) {
-        SelectionKey key = new XSelectionKey();
-        key.attach(attachment);
-        return key;
+        assertHttpClientEvents(events, listner.events());
     }
 
-    private static class XSelectionKey extends SelectionKey {
-
-        @Override
-        public SelectableChannel channel() {
-            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
-        }
-
-        @Override
-        public Selector selector() {
-            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
-        }
-
-        @Override
-        public boolean isValid() {
-            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
-        }
-
-        @Override
-        public void cancel() {
-            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
-        }
-
-        @Override
-        public int interestOps() {
-            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
-        }
-
-        @Override
-        public SelectionKey interestOps(int ops) {
-            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
-        }
-
-        @Override
-        public int readyOps() {
-            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
-        }
-
-    }
 }
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Mon Jan 15 09:23:05 2018 +0100
+++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Mon Jan 15 12:26:11 2018 +0100
@@ -4,15 +4,12 @@
 import com.passus.data.PooledAllocator;
 import com.passus.st.client.ArrayListEventHandler;
 import com.passus.st.client.Event;
-import com.passus.st.client.http.HttpSessionPayloadEvent;
 import com.passus.st.utils.EventUtils;
-import static com.passus.st.utils.HttpMessageAssert.assertMessages;
-import static com.passus.st.utils.HttpMessageAssert.assertMessagesContent;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
-import static org.testng.AssertJUnit.*;
+import static com.passus.st.utils.Assert.*;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -48,25 +45,6 @@
         };
     }
 
-    private void assertEvents(FileEvents expectedEvents, List<Event> events) {
-        assertEquals(expectedEvents.events.size(), events.size());
-        for (int i = 0; i < events.size(); i++) {
-            Event expectedEvent = expectedEvents.events.get(i);
-            Event event = events.get(i);
-
-            assertEquals(expectedEvent.getType(), event.getType());
-            if (event.getType() == HttpSessionPayloadEvent.TYPE) {
-                HttpSessionPayloadEvent expectedPayloadEvent = (HttpSessionPayloadEvent) expectedEvent;
-                HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event;
-
-                assertMessages(expectedPayloadEvent.getRequest(), payloadEvent.getRequest());
-                assertMessagesContent(expectedPayloadEvent.getRequest(), payloadEvent.getRequest());
-                assertMessages(expectedPayloadEvent.getResponse(), payloadEvent.getResponse());
-                assertMessagesContent(expectedPayloadEvent.getResponse(), payloadEvent.getResponse());
-            }
-        }
-    }
-
     @Test(dataProvider = "pcapFiles")
     public void testRead(String pcapFile) throws Exception {
         FileEvents fileEvents = writeEvents(pcapFile);
@@ -78,7 +56,7 @@
             eventSource.start();
 
             List<Event> events = handler.getEvents();
-            assertEvents(fileEvents, events);
+            assertEvents(fileEvents.events, events);
         } finally {
             fileEvents.ncFile.delete();
         }
@@ -97,7 +75,7 @@
             eventSource.start();
 
             List<Event> events = handler.getEvents();
-            assertEvents(fileEvents, events);
+            assertEvents(fileEvents.events, events);
             assertEquals(2, allocator.usedSize());
         } finally {
             fileEvents.ncFile.delete();
--- a/stress-tester/src/test/java/com/passus/st/utils/Assert.java	Mon Jan 15 09:23:05 2018 +0100
+++ b/stress-tester/src/test/java/com/passus/st/utils/Assert.java	Mon Jan 15 12:26:11 2018 +0100
@@ -3,6 +3,19 @@
 import com.passus.data.ByteString;
 import com.passus.data.ByteStringImpl;
 import com.passus.data.SliceByteString;
+import com.passus.st.client.Event;
+import com.passus.st.client.TestHttpClientListener;
+import com.passus.st.client.TestHttpClientListener.HttpClientEvent;
+import com.passus.st.client.TestHttpClientListener.HttpClientEventType;
+import com.passus.st.client.TestHttpClientListener.ResponseReceivedEvent;
+import com.passus.st.client.http.HttpSessionPayloadEvent;
+import com.passus.st.source.NcEventSourceTest;
+import static com.passus.st.utils.HttpMessageAssert.assertMessages;
+import static com.passus.st.utils.HttpMessageAssert.assertMessagesContent;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.testng.AssertJUnit;
 import static org.testng.AssertJUnit.assertEquals;
 
@@ -27,4 +40,40 @@
         assertEquals(null, expectedBs, actual);
     }
 
+    public static void assertHttpClientEvents(List<Event> expectedEvents, Collection<HttpClientEvent> httpClientEvents) {
+        expectedEvents = expectedEvents.stream()
+                .filter((e) -> e.getType() == HttpSessionPayloadEvent.TYPE)
+                .collect(Collectors.toList());
+
+        List<Event> events = new ArrayList<>(httpClientEvents.size());
+        httpClientEvents.forEach((event) -> {
+            if (event.getType() == HttpClientEventType.RESPONSE_RECEIVED) {
+                ResponseReceivedEvent respReceived = (ResponseReceivedEvent) event;
+                HttpSessionPayloadEvent payloadEvent = new HttpSessionPayloadEvent(null, respReceived.getRequest(), respReceived.getResponse(), null);
+                events.add(payloadEvent);
+            }
+        });
+
+        assertEvents(expectedEvents, events);
+    }
+
+    public static void assertEvents(List<Event> expectedEvents, List<Event> events) {
+        assertEquals(expectedEvents.size(), events.size());
+        for (int i = 0; i < events.size(); i++) {
+            Event expectedEvent = expectedEvents.get(i);
+            Event event = events.get(i);
+
+            assertEquals(expectedEvent.getType(), event.getType());
+            if (event.getType() == HttpSessionPayloadEvent.TYPE) {
+                HttpSessionPayloadEvent expectedPayloadEvent = (HttpSessionPayloadEvent) expectedEvent;
+                HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event;
+
+                assertMessages(expectedPayloadEvent.getRequest(), payloadEvent.getRequest());
+                assertMessagesContent(expectedPayloadEvent.getRequest(), payloadEvent.getRequest());
+                assertMessages(expectedPayloadEvent.getResponse(), payloadEvent.getResponse());
+                assertMessagesContent(expectedPayloadEvent.getResponse(), payloadEvent.getResponse());
+            }
+        }
+    }
+
 }