changeset 1024:14adc04b1b8d

SocketEmitter bugfixes
author Devel 2
date Wed, 01 Apr 2020 10:52:41 +0200
parents 831da81c262f
children c0bf1f5ed2ee
files stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java stress-tester/src/test/java/com/passus/st/emitter/AbstractEmitterTest.java stress-tester/src/test/java/com/passus/st/emitter/socket/SocketEmitterTest.java
diffstat 3 files changed, 185 insertions(+), 21 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Tue Mar 31 11:55:55 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Wed Apr 01 10:52:41 2020 +0200
@@ -9,8 +9,8 @@
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 @Plugin(name = SocketEmitter.TYPE, category = PluginConstants.CATEGORY_EMITTER)
 public class SocketEmitter implements Emitter {
@@ -27,7 +27,7 @@
 
     private boolean collectMetrics = DEFAULT_COLLECT_METRICS;
 
-    private Map<SessionInfo, Connection> connections = new HashMap<>();
+    private Map<SessionInfo, Connection> connections = new ConcurrentHashMap<>();
 
     private int maxThreads = DEFAULT_NUM_THREADS;
 
@@ -123,25 +123,7 @@
 
                 @Override
                 public void onConnectionClosed(ChannelContext channelContext) {
-                    synchronized (SocketEmitter.this) {
-                        try {
-                            handler.channelInactive(channelContext);
-                        } catch (Exception e) {
-                            LOGGER.debug(e.getMessage(), e);
-                        }
-
-                        connections.remove(channelContext.getSessionInfo());
-
-                        try {
-                            handler.channelUnregistered(channelContext);
-                        } catch (Exception e) {
-                            LOGGER.debug(e.getMessage(), e);
-                        }
-
-                        if (LOGGER.isDebugEnabled()) {
-                            LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'.");
-                        }
-                    }
+                    connections.remove(channelContext.getSessionInfo());
                 }
 
             };
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/emitter/AbstractEmitterTest.java	Wed Apr 01 10:52:41 2020 +0200
@@ -0,0 +1,164 @@
+package com.passus.st.emitter;
+
+import com.passus.data.ByteBuff;
+import com.passus.data.HeapByteBuff;
+import com.passus.net.http.HttpConsts;
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpRequestBuilder;
+import com.passus.net.http.HttpRequestEncoder;
+import com.passus.st.AbstractWireMockTest;
+import com.passus.st.client.TestClientHandler;
+import com.passus.st.client.TestClientHandler.EventType;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+import java.net.ConnectException;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.*;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+
+public abstract class AbstractEmitterTest<T extends Emitter> extends AbstractWireMockTest {
+
+    protected void waitConn(TestClientHandler handler, int expectedEventsSize) {
+        waitConn(handler, expectedEventsSize, 5_000);
+    }
+
+    protected void waitConn(TestClientHandler handler, int expectedEventsSize, long timeout) {
+        long endTime = System.currentTimeMillis() + timeout;
+        if (endTime <= 0) {
+            endTime = Long.MAX_VALUE;
+        }
+
+        while (true) {
+            if (handler.size() >= expectedEventsSize
+                    || handler.isChannelUnregistered()) {
+                return;
+            } else if (System.currentTimeMillis() >= endTime) {
+                throw new RuntimeException("Timeout");
+            }
+
+            try {
+                Thread.sleep(50);
+            } catch (Exception ignore) {
+            }
+        }
+    }
+
+    public abstract T createEmitter();
+
+    @Test(enabled = true)
+    public void testConnectAndClose() throws Exception {
+        T emitter = createEmitter();
+        try {
+            emitter.start();
+            SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, port());
+
+            TestClientHandler handler = new TestClientHandler() {
+                @Override
+                protected void doChannelActive(ChannelContext context) throws Exception {
+                    context.close();
+                }
+            };
+
+            emitter.connect(info, handler, 0);
+            waitConn(handler, 4);
+
+            AssertJUnit.assertEquals(4, handler.size());
+            AssertJUnit.assertEquals(EventType.CHANNEL_REGISTERED, handler.get(0).getType());
+            AssertJUnit.assertEquals(EventType.CHANNEL_ACTIVE, handler.get(1).getType());
+            AssertJUnit.assertEquals(EventType.CHANNEL_INACTIVE, handler.get(2).getType());
+            AssertJUnit.assertEquals(EventType.CHANNEL_UNREGISTERED, handler.get(3).getType());
+        } finally {
+            emitter.stop();
+        }
+    }
+
+    @Test(enabled = true)
+    public void testWriteAndRead() throws Exception {
+        String path = "/some/thing";
+        String url = "http://localhost" + path;
+        HttpRequest req = HttpRequestBuilder.get(url).version(HttpConsts.VERSION_1_0).build();
+        String content = "Hello world!";
+        stubFor(get(urlEqualTo("/some/thing"))
+                .willReturn(aResponse()
+                        .withHeader("Content-Type", "text/plain")
+                        .withHeader("Content-Length", "" + content.length())
+                        .withBody(content)));
+
+        T emitter = createEmitter();
+        emitter.start();
+        try {
+            SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, port());
+
+            TestClientHandler handler = new TestClientHandler() {
+
+                @Override
+                protected void doChannelActive(ChannelContext context) throws Exception {
+                    ByteBuff buff = new HeapByteBuff();
+                    HttpRequestEncoder.getInstance().encode(req, buff);
+                    context.writeAndFlush(buff);
+                }
+
+                @Override
+                protected void doDataReceived(ChannelContext context, ByteBuff data) throws Exception {
+                    if (data.toString().contains("!")) {
+                        context.close();
+                    }
+                }
+
+            };
+
+            emitter.connect(info, handler, 0);
+            waitConn(handler, 20, 5_000);
+
+            assertTrue(handler.size() >= 6);
+            int index = 0;
+            assertEquals(EventType.CHANNEL_REGISTERED, handler.get(index++).getType());
+            assertEquals(EventType.CHANNEL_ACTIVE, handler.get(index++).getType());
+            assertEquals(EventType.DATA_WRITTEN, handler.get(index++).getType());
+
+            ByteBuff resContentBuff = new HeapByteBuff();
+
+            do {
+                TestClientHandler.ClientEvent event = handler.get(index++);
+                assertEquals(EventType.DATA_RECEIVED, event.getType());
+                resContentBuff.append(event.getData());
+            } while (handler.get(index).getType() == EventType.DATA_RECEIVED);
+
+            assertEquals(EventType.CHANNEL_INACTIVE, handler.get(index++).getType());
+            assertEquals(EventType.CHANNEL_UNREGISTERED, handler.get(index++).getType());
+            String responseContent = resContentBuff.toString();
+            assertTrue(responseContent.endsWith(content));
+        } finally {
+            emitter.stop();
+        }
+    }
+
+    @Test(enabled = true)
+    public void testError_ConnectionRefused() throws Exception {
+        T emitter = createEmitter();
+        emitter.start();
+        try {
+            SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, 10_000);
+
+            TestClientHandler handler = new TestClientHandler() {
+                @Override
+                protected void doChannelActive(ChannelContext context) throws Exception {
+                    context.close();
+                }
+            };
+
+            emitter.connect(info, handler, 0);
+            waitConn(handler, 2);
+
+            AssertJUnit.assertEquals(EventType.CHANNEL_REGISTERED, handler.get(0).getType());
+            TestClientHandler.ClientEvent clientEvent = handler.get(1);
+            AssertJUnit.assertEquals(EventType.ERROR_OCCURRED, clientEvent.getType());
+            AssertJUnit.assertTrue(clientEvent.getCause() instanceof ConnectException);
+            AssertJUnit.assertTrue(clientEvent.getCause().getMessage().startsWith("Connection refused"));
+        } finally {
+            emitter.stop();
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/emitter/socket/SocketEmitterTest.java	Wed Apr 01 10:52:41 2020 +0200
@@ -0,0 +1,18 @@
+package com.passus.st.emitter.socket;
+
+import com.passus.st.emitter.AbstractEmitterTest;
+import com.passus.st.emitter.Emitter;
+import org.testng.annotations.Test;
+
+public class SocketEmitterTest extends AbstractEmitterTest {
+
+    @Override
+    public Emitter createEmitter() {
+        return new SocketEmitter();
+    }
+
+    /*@Test(enabled = false)
+    public void testWriteAndRead() throws Exception {
+
+    }*/
+}
\ No newline at end of file