Mercurial > stress-tester
changeset 980:8c03beba3161
SocketEmitter in progress
author | Devel 2 |
---|---|
date | Wed, 31 Jul 2019 10:34:10 +0200 |
parents | 700a3005eaf8 |
children | da4a9beebbd3 |
files | stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java |
diffstat | 3 files changed, 56 insertions(+), 46 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java Mon Jul 29 15:47:44 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java Wed Jul 31 10:34:10 2019 +0200 @@ -9,6 +9,9 @@ import java.util.LinkedList; import java.util.Queue; +import static com.passus.st.emitter.socket.Connection.CLOSE_TASK; +import static com.passus.st.emitter.socket.Connection.FLUSH_TASK; + public abstract class AbstractChannelContext<K> implements ChannelContext<K> { private final Connection connection; @@ -66,17 +69,11 @@ } public void flush() throws IOException { - synchronized (connection) { - connection.flag = Connection.FLAG_FLUSH; - connection.notify(); - } + connection.tasks.add(FLUSH_TASK); } public void close() throws IOException { - synchronized (connection) { - connection.flag = Connection.FLAG_CLOSE; - connection.notify(); - } + connection.tasks.add(CLOSE_TASK); } @Override
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Mon Jul 29 15:47:44 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Wed Jul 31 10:34:10 2019 +0200 @@ -9,20 +9,18 @@ import org.apache.logging.log4j.Logger; import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; public abstract class Connection extends Thread { protected final Logger logger = LogManager.getLogger(getClass()); - static final int FLAG_CONNECT = 1; - static final int FLAG_FLUSH = 1 << 1; - static final int FLAG_READ = 1 << 2; - static final int FLAG_CLOSE = 1 << 3; + protected final BlockingQueue<Task> tasks = new LinkedBlockingQueue<>(); boolean working = true; - int flag = FLAG_CONNECT; - final SessionInfo sessionInfo; final EmitterHandler handler; @@ -105,41 +103,53 @@ @Override public void run() { - connect(); while (working) { synchronized (this) { - if (flag != 0) { - try { - wait(1); - } catch (InterruptedException ignore) { - } - } + Task task = tasks.poll(); - switch (flag) { - case FLAG_FLUSH: - Queue<byte[]> queue = channelContext.dataQueue(); - if (!queue.isEmpty()) { - byte[] data; - while ((data = queue.poll()) != null) { - write(data); + if (task != null) { + switch (task.code) { + case Task.CONNECT: + connect(); + break; + case Task.FLUSH: + Queue<byte[]> queue = channelContext.dataQueue(); + if (!queue.isEmpty()) { + byte[] data; + while ((data = queue.poll()) != null) { + write(data); + } } - } - - flag = 0; - break; - case FLAG_READ: - break; - case FLAG_CLOSE: - close(); - if (listener != null) { - listener.onConnectionClosed(channelContext); - } - working = false; - flag = 0; - break; + break; + case Task.CLOSE: + close(); + if (listener != null) { + listener.onConnectionClosed(channelContext); + } + working = false; + break; + } } } } + } + + protected static class Task { + + public static final int CONNECT = 1; + public static final int FLUSH = 2; + public static final int CLOSE = 3; + + final int code; + + public Task(int code) { + this.code = code; + } } + + static final Task CONNECT_TASK = new Task(Task.CONNECT); + static final Task FLUSH_TASK = new Task(Task.FLUSH); + static final Task CLOSE_TASK = new Task(Task.CLOSE); + }
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Mon Jul 29 15:47:44 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Wed Jul 31 10:34:10 2019 +0200 @@ -13,8 +13,6 @@ import java.util.HashMap; import java.util.Map; -import static com.passus.st.emitter.socket.Connection.FLAG_CLOSE; - @Plugin(name = SocketEmitter.TYPE, category = PluginConstants.CATEGORY_EMITTER) public class SocketEmitter implements Emitter { @@ -64,7 +62,6 @@ return started; } - @Override public void start() { synchronized (this) { @@ -89,7 +86,11 @@ connections.forEach((sessionInfo, conn) -> { if (conn.isConnected()) { - conn.flag = FLAG_CLOSE; + try { + conn.tasks.put(Connection.CLOSE_TASK); + } catch (InterruptedException e) { + + } } conn.working = false; @@ -143,6 +144,8 @@ } }); + + connection.tasks.add(Connection.CONNECT_TASK); connection.start(); connections.put(sessionInfo, connection); }