changeset 1026:1921cac1f89e

Socket Connection bugfix + minors
author Devel 2
date Wed, 01 Apr 2020 11:04:20 +0200
parents c0bf1f5ed2ee
children 1e0034fb17dd
files stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/test/java/com/passus/st/emitter/nio/NioEmitterTest.java stress-tester/src/test/java/com/passus/st/emitter/socket/SocketEmitterTest.java
diffstat 4 files changed, 14 insertions(+), 23 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java	Wed Apr 01 10:57:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java	Wed Apr 01 11:04:20 2020 +0200
@@ -16,7 +16,7 @@
 
     protected final Connection connection;
 
-    private boolean bidirectional;
+    private boolean bidirectional = true;
 
     private final Queue<byte[]> dataQueue;
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Wed Apr 01 10:57:59 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Wed Apr 01 11:04:20 2020 +0200
@@ -10,14 +10,13 @@
 
 import java.io.IOException;
 import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 public abstract class Connection extends Thread {
 
     protected final Logger logger = LogManager.getLogger(getClass());
 
-    protected final BlockingQueue<Task> tasks = new LinkedBlockingQueue<>();
+    protected final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>();
 
     boolean working = true;
 
@@ -73,6 +72,10 @@
                 write(data);
             }
         }
+
+        if (channelContext.isBidirectional()) {
+            tasks.add(StatelessTasks.READ_TASK);
+        }
     }
 
     private void write(byte[] data) {
@@ -84,9 +87,13 @@
     private void doRead() {
         buffer.clear();
 
-        int totalReaded = 0;
+        int totalRead = 0;
         try {
-            read(buffer);
+            int read;
+            while ((read = read(buffer)) != -1) {
+                totalRead += read;
+            }
+
         } catch (IOException e) {
             doCatchException(channelContext, e);
             doClose();
@@ -95,11 +102,11 @@
 
 
         if (logger.isDebugEnabled()) {
-            logger.debug("Readed {}B ({} -> {})", totalReaded,
+            logger.debug("Readed {}B ({} -> {})", buffer.readableBytes(),
                     channelContext.getLocalAddress(), channelContext.getRemoteAddress());
         }
 
-        if (totalReaded > 0) {
+        if (buffer.readableBytes() > 0) {
             try {
                 handler.dataReceived(channelContext, buffer);
                 logger.debug("Read handled.");
@@ -149,7 +156,6 @@
         while (working) {
             synchronized (this) {
                 Task task = tasks.poll();
-
                 if (task != null) {
                     switch (task.code) {
                         case Task.CONNECT:
--- a/stress-tester/src/test/java/com/passus/st/emitter/nio/NioEmitterTest.java	Wed Apr 01 10:57:59 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/emitter/nio/NioEmitterTest.java	Wed Apr 01 11:04:20 2020 +0200
@@ -1,16 +1,7 @@
 package com.passus.st.emitter.nio;
 
-import com.passus.st.client.TestClientHandler;
-import com.passus.st.client.TestClientHandler.ClientEvent;
-import com.passus.st.client.TestClientHandler.EventType;
 import com.passus.st.emitter.AbstractEmitterTest;
-import com.passus.st.emitter.ChannelContext;
 import com.passus.st.emitter.Emitter;
-import com.passus.st.emitter.SessionInfo;
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-import java.net.ConnectException;
 
 /**
  * @author Mirosław Hawrot
@@ -24,5 +15,4 @@
         return emitter;
     }
 
-
 }
--- a/stress-tester/src/test/java/com/passus/st/emitter/socket/SocketEmitterTest.java	Wed Apr 01 10:57:59 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/emitter/socket/SocketEmitterTest.java	Wed Apr 01 11:04:20 2020 +0200
@@ -2,7 +2,6 @@
 
 import com.passus.st.emitter.AbstractEmitterTest;
 import com.passus.st.emitter.Emitter;
-import org.testng.annotations.Test;
 
 public class SocketEmitterTest extends AbstractEmitterTest {
 
@@ -11,8 +10,4 @@
         return new SocketEmitter();
     }
 
-    /*@Test(enabled = false)
-    public void testWriteAndRead() throws Exception {
-
-    }*/
 }
\ No newline at end of file