Mercurial > stress-tester
changeset 1026:1921cac1f89e
Socket Connection bugfix + minors
author | Devel 2 |
---|---|
date | Wed, 01 Apr 2020 11:04:20 +0200 |
parents | c0bf1f5ed2ee |
children | 1e0034fb17dd |
files | stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/test/java/com/passus/st/emitter/nio/NioEmitterTest.java stress-tester/src/test/java/com/passus/st/emitter/socket/SocketEmitterTest.java |
diffstat | 4 files changed, 14 insertions(+), 23 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java Wed Apr 01 10:57:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java Wed Apr 01 11:04:20 2020 +0200 @@ -16,7 +16,7 @@ protected final Connection connection; - private boolean bidirectional; + private boolean bidirectional = true; private final Queue<byte[]> dataQueue;
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Wed Apr 01 10:57:59 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Wed Apr 01 11:04:20 2020 +0200 @@ -10,14 +10,13 @@ import java.io.IOException; import java.util.Queue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public abstract class Connection extends Thread { protected final Logger logger = LogManager.getLogger(getClass()); - protected final BlockingQueue<Task> tasks = new LinkedBlockingQueue<>(); + protected final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>(); boolean working = true; @@ -73,6 +72,10 @@ write(data); } } + + if (channelContext.isBidirectional()) { + tasks.add(StatelessTasks.READ_TASK); + } } private void write(byte[] data) { @@ -84,9 +87,13 @@ private void doRead() { buffer.clear(); - int totalReaded = 0; + int totalRead = 0; try { - read(buffer); + int read; + while ((read = read(buffer)) != -1) { + totalRead += read; + } + } catch (IOException e) { doCatchException(channelContext, e); doClose(); @@ -95,11 +102,11 @@ if (logger.isDebugEnabled()) { - logger.debug("Readed {}B ({} -> {})", totalReaded, + logger.debug("Readed {}B ({} -> {})", buffer.readableBytes(), channelContext.getLocalAddress(), channelContext.getRemoteAddress()); } - if (totalReaded > 0) { + if (buffer.readableBytes() > 0) { try { handler.dataReceived(channelContext, buffer); logger.debug("Read handled."); @@ -149,7 +156,6 @@ while (working) { synchronized (this) { Task task = tasks.poll(); - if (task != null) { switch (task.code) { case Task.CONNECT:
--- a/stress-tester/src/test/java/com/passus/st/emitter/nio/NioEmitterTest.java Wed Apr 01 10:57:59 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/emitter/nio/NioEmitterTest.java Wed Apr 01 11:04:20 2020 +0200 @@ -1,16 +1,7 @@ package com.passus.st.emitter.nio; -import com.passus.st.client.TestClientHandler; -import com.passus.st.client.TestClientHandler.ClientEvent; -import com.passus.st.client.TestClientHandler.EventType; import com.passus.st.emitter.AbstractEmitterTest; -import com.passus.st.emitter.ChannelContext; import com.passus.st.emitter.Emitter; -import com.passus.st.emitter.SessionInfo; -import org.testng.AssertJUnit; -import org.testng.annotations.Test; - -import java.net.ConnectException; /** * @author Mirosław Hawrot @@ -24,5 +15,4 @@ return emitter; } - }
--- a/stress-tester/src/test/java/com/passus/st/emitter/socket/SocketEmitterTest.java Wed Apr 01 10:57:59 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/emitter/socket/SocketEmitterTest.java Wed Apr 01 11:04:20 2020 +0200 @@ -2,7 +2,6 @@ import com.passus.st.emitter.AbstractEmitterTest; import com.passus.st.emitter.Emitter; -import org.testng.annotations.Test; public class SocketEmitterTest extends AbstractEmitterTest { @@ -11,8 +10,4 @@ return new SocketEmitter(); } - /*@Test(enabled = false) - public void testWriteAndRead() throws Exception { - - }*/ } \ No newline at end of file