Mercurial > stress-tester
changeset 1024:14adc04b1b8d
SocketEmitter bugfixes
author | Devel 2 |
---|---|
date | Wed, 01 Apr 2020 10:52:41 +0200 |
parents | 831da81c262f |
children | c0bf1f5ed2ee |
files | stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java stress-tester/src/test/java/com/passus/st/emitter/AbstractEmitterTest.java stress-tester/src/test/java/com/passus/st/emitter/socket/SocketEmitterTest.java |
diffstat | 3 files changed, 185 insertions(+), 21 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Tue Mar 31 11:55:55 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Wed Apr 01 10:52:41 2020 +0200 @@ -9,8 +9,8 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; @Plugin(name = SocketEmitter.TYPE, category = PluginConstants.CATEGORY_EMITTER) public class SocketEmitter implements Emitter { @@ -27,7 +27,7 @@ private boolean collectMetrics = DEFAULT_COLLECT_METRICS; - private Map<SessionInfo, Connection> connections = new HashMap<>(); + private Map<SessionInfo, Connection> connections = new ConcurrentHashMap<>(); private int maxThreads = DEFAULT_NUM_THREADS; @@ -123,25 +123,7 @@ @Override public void onConnectionClosed(ChannelContext channelContext) { - synchronized (SocketEmitter.this) { - try { - handler.channelInactive(channelContext); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } - - connections.remove(channelContext.getSessionInfo()); - - try { - handler.channelUnregistered(channelContext); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'."); - } - } + connections.remove(channelContext.getSessionInfo()); } };
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/emitter/AbstractEmitterTest.java Wed Apr 01 10:52:41 2020 +0200 @@ -0,0 +1,164 @@ +package com.passus.st.emitter; + +import com.passus.data.ByteBuff; +import com.passus.data.HeapByteBuff; +import com.passus.net.http.HttpConsts; +import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpRequestBuilder; +import com.passus.net.http.HttpRequestEncoder; +import com.passus.st.AbstractWireMockTest; +import com.passus.st.client.TestClientHandler; +import com.passus.st.client.TestClientHandler.EventType; +import org.testng.AssertJUnit; +import org.testng.annotations.Test; + +import java.net.ConnectException; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; + +public abstract class AbstractEmitterTest<T extends Emitter> extends AbstractWireMockTest { + + protected void waitConn(TestClientHandler handler, int expectedEventsSize) { + waitConn(handler, expectedEventsSize, 5_000); + } + + protected void waitConn(TestClientHandler handler, int expectedEventsSize, long timeout) { + long endTime = System.currentTimeMillis() + timeout; + if (endTime <= 0) { + endTime = Long.MAX_VALUE; + } + + while (true) { + if (handler.size() >= expectedEventsSize + || handler.isChannelUnregistered()) { + return; + } else if (System.currentTimeMillis() >= endTime) { + throw new RuntimeException("Timeout"); + } + + try { + Thread.sleep(50); + } catch (Exception ignore) { + } + } + } + + public abstract T createEmitter(); + + @Test(enabled = true) + public void testConnectAndClose() throws Exception { + T emitter = createEmitter(); + try { + emitter.start(); + SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, port()); + + TestClientHandler handler = new TestClientHandler() { + @Override + protected void doChannelActive(ChannelContext context) throws Exception { + context.close(); + } + }; + + emitter.connect(info, handler, 0); + waitConn(handler, 4); + + AssertJUnit.assertEquals(4, handler.size()); + AssertJUnit.assertEquals(EventType.CHANNEL_REGISTERED, handler.get(0).getType()); + AssertJUnit.assertEquals(EventType.CHANNEL_ACTIVE, handler.get(1).getType()); + AssertJUnit.assertEquals(EventType.CHANNEL_INACTIVE, handler.get(2).getType()); + AssertJUnit.assertEquals(EventType.CHANNEL_UNREGISTERED, handler.get(3).getType()); + } finally { + emitter.stop(); + } + } + + @Test(enabled = true) + public void testWriteAndRead() throws Exception { + String path = "/some/thing"; + String url = "http://localhost" + path; + HttpRequest req = HttpRequestBuilder.get(url).version(HttpConsts.VERSION_1_0).build(); + String content = "Hello world!"; + stubFor(get(urlEqualTo("/some/thing")) + .willReturn(aResponse() + .withHeader("Content-Type", "text/plain") + .withHeader("Content-Length", "" + content.length()) + .withBody(content))); + + T emitter = createEmitter(); + emitter.start(); + try { + SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, port()); + + TestClientHandler handler = new TestClientHandler() { + + @Override + protected void doChannelActive(ChannelContext context) throws Exception { + ByteBuff buff = new HeapByteBuff(); + HttpRequestEncoder.getInstance().encode(req, buff); + context.writeAndFlush(buff); + } + + @Override + protected void doDataReceived(ChannelContext context, ByteBuff data) throws Exception { + if (data.toString().contains("!")) { + context.close(); + } + } + + }; + + emitter.connect(info, handler, 0); + waitConn(handler, 20, 5_000); + + assertTrue(handler.size() >= 6); + int index = 0; + assertEquals(EventType.CHANNEL_REGISTERED, handler.get(index++).getType()); + assertEquals(EventType.CHANNEL_ACTIVE, handler.get(index++).getType()); + assertEquals(EventType.DATA_WRITTEN, handler.get(index++).getType()); + + ByteBuff resContentBuff = new HeapByteBuff(); + + do { + TestClientHandler.ClientEvent event = handler.get(index++); + assertEquals(EventType.DATA_RECEIVED, event.getType()); + resContentBuff.append(event.getData()); + } while (handler.get(index).getType() == EventType.DATA_RECEIVED); + + assertEquals(EventType.CHANNEL_INACTIVE, handler.get(index++).getType()); + assertEquals(EventType.CHANNEL_UNREGISTERED, handler.get(index++).getType()); + String responseContent = resContentBuff.toString(); + assertTrue(responseContent.endsWith(content)); + } finally { + emitter.stop(); + } + } + + @Test(enabled = true) + public void testError_ConnectionRefused() throws Exception { + T emitter = createEmitter(); + emitter.start(); + try { + SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, 10_000); + + TestClientHandler handler = new TestClientHandler() { + @Override + protected void doChannelActive(ChannelContext context) throws Exception { + context.close(); + } + }; + + emitter.connect(info, handler, 0); + waitConn(handler, 2); + + AssertJUnit.assertEquals(EventType.CHANNEL_REGISTERED, handler.get(0).getType()); + TestClientHandler.ClientEvent clientEvent = handler.get(1); + AssertJUnit.assertEquals(EventType.ERROR_OCCURRED, clientEvent.getType()); + AssertJUnit.assertTrue(clientEvent.getCause() instanceof ConnectException); + AssertJUnit.assertTrue(clientEvent.getCause().getMessage().startsWith("Connection refused")); + } finally { + emitter.stop(); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/emitter/socket/SocketEmitterTest.java Wed Apr 01 10:52:41 2020 +0200 @@ -0,0 +1,18 @@ +package com.passus.st.emitter.socket; + +import com.passus.st.emitter.AbstractEmitterTest; +import com.passus.st.emitter.Emitter; +import org.testng.annotations.Test; + +public class SocketEmitterTest extends AbstractEmitterTest { + + @Override + public Emitter createEmitter() { + return new SocketEmitter(); + } + + /*@Test(enabled = false) + public void testWriteAndRead() throws Exception { + + }*/ +} \ No newline at end of file