changeset 980:8c03beba3161

SocketEmitter in progress
author Devel 2
date Wed, 31 Jul 2019 10:34:10 +0200
parents 700a3005eaf8
children da4a9beebbd3
files stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java
diffstat 3 files changed, 56 insertions(+), 46 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java	Mon Jul 29 15:47:44 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java	Wed Jul 31 10:34:10 2019 +0200
@@ -9,6 +9,9 @@
 import java.util.LinkedList;
 import java.util.Queue;
 
+import static com.passus.st.emitter.socket.Connection.CLOSE_TASK;
+import static com.passus.st.emitter.socket.Connection.FLUSH_TASK;
+
 public abstract class AbstractChannelContext<K> implements ChannelContext<K> {
 
     private final Connection connection;
@@ -66,17 +69,11 @@
     }
 
     public void flush() throws IOException {
-        synchronized (connection) {
-            connection.flag = Connection.FLAG_FLUSH;
-            connection.notify();
-        }
+        connection.tasks.add(FLUSH_TASK);
     }
 
     public void close() throws IOException {
-        synchronized (connection) {
-            connection.flag = Connection.FLAG_CLOSE;
-            connection.notify();
-        }
+        connection.tasks.add(CLOSE_TASK);
     }
 
     @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Mon Jul 29 15:47:44 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Wed Jul 31 10:34:10 2019 +0200
@@ -9,20 +9,18 @@
 import org.apache.logging.log4j.Logger;
 
 import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 public abstract class Connection extends Thread {
 
     protected final Logger logger = LogManager.getLogger(getClass());
 
-    static final int FLAG_CONNECT = 1;
-    static final int FLAG_FLUSH = 1 << 1;
-    static final int FLAG_READ = 1 << 2;
-    static final int FLAG_CLOSE = 1 << 3;
+    protected final BlockingQueue<Task> tasks = new LinkedBlockingQueue<>();
 
     boolean working = true;
 
-    int flag = FLAG_CONNECT;
-
     final SessionInfo sessionInfo;
 
     final EmitterHandler handler;
@@ -105,41 +103,53 @@
 
     @Override
     public void run() {
-        connect();
         while (working) {
             synchronized (this) {
-                if (flag != 0) {
-                    try {
-                        wait(1);
-                    } catch (InterruptedException ignore) {
-                    }
-                }
+                Task task  = tasks.poll();
 
-                switch (flag) {
-                    case FLAG_FLUSH:
-                        Queue<byte[]> queue = channelContext.dataQueue();
-                        if (!queue.isEmpty()) {
-                            byte[] data;
-                            while ((data = queue.poll()) != null) {
-                                write(data);
+                if (task != null) {
+                    switch (task.code) {
+                        case Task.CONNECT:
+                            connect();
+                            break;
+                        case Task.FLUSH:
+                            Queue<byte[]> queue = channelContext.dataQueue();
+                            if (!queue.isEmpty()) {
+                                byte[] data;
+                                while ((data = queue.poll()) != null) {
+                                    write(data);
+                                }
                             }
-                        }
-
-                        flag = 0;
-                        break;
-                    case FLAG_READ:
-                        break;
-                    case FLAG_CLOSE:
-                        close();
-                        if (listener != null) {
-                            listener.onConnectionClosed(channelContext);
-                        }
-                        working = false;
-                        flag = 0;
-                        break;
+                            break;
+                        case Task.CLOSE:
+                            close();
+                            if (listener != null) {
+                                listener.onConnectionClosed(channelContext);
+                            }
+                            working = false;
+                            break;
+                    }
                 }
             }
         }
+    }
+
+    protected static class Task {
+
+        public static final int CONNECT = 1;
+        public static final int FLUSH = 2;
+        public static final int CLOSE = 3;
+
+        final int code;
+
+        public Task(int code) {
+            this.code = code;
+        }
 
     }
+
+    static final Task CONNECT_TASK = new Task(Task.CONNECT);
+    static final Task FLUSH_TASK = new Task(Task.FLUSH);
+    static final Task CLOSE_TASK = new Task(Task.CLOSE);
+
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Mon Jul 29 15:47:44 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Wed Jul 31 10:34:10 2019 +0200
@@ -13,8 +13,6 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import static com.passus.st.emitter.socket.Connection.FLAG_CLOSE;
-
 @Plugin(name = SocketEmitter.TYPE, category = PluginConstants.CATEGORY_EMITTER)
 public class SocketEmitter implements Emitter {
 
@@ -64,7 +62,6 @@
         return started;
     }
 
-
     @Override
     public void start() {
         synchronized (this) {
@@ -89,7 +86,11 @@
 
             connections.forEach((sessionInfo, conn) -> {
                 if (conn.isConnected()) {
-                    conn.flag = FLAG_CLOSE;
+                    try {
+                        conn.tasks.put(Connection.CLOSE_TASK);
+                    } catch (InterruptedException e) {
+
+                    }
                 }
 
                 conn.working = false;
@@ -143,6 +144,8 @@
                     }
 
                 });
+
+                connection.tasks.add(Connection.CONNECT_TASK);
                 connection.start();
                 connections.put(sessionInfo, connection);
             }