Mercurial > stress-tester
changeset 990:ba4568bda0a3
SocketEmitter in progress
author | Devel 2 |
---|---|
date | Thu, 05 Sep 2019 08:04:42 +0200 |
parents | 6414bc0350bb |
children | 1d5d4cfd88fb |
files | stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java |
diffstat | 3 files changed, 113 insertions(+), 23 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Wed Sep 04 08:59:30 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Thu Sep 05 08:04:42 2019 +0200 @@ -1,5 +1,7 @@ package com.passus.st.emitter.socket; +import com.passus.data.ByteBuff; +import com.passus.data.HeapByteBuff; import com.passus.net.SocketAddress; import com.passus.st.emitter.EmitterHandler; import com.passus.st.emitter.EmitterMetric; @@ -9,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -41,6 +44,8 @@ EmitterMetric metric; + ByteBuff buffer = new HeapByteBuff(); + public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, ConnectionListener listener, boolean collectMetrics) { this.sessionInfo = sessionInfo; this.handler = handler; @@ -102,11 +107,84 @@ public abstract void close(); + private void doWrite() { + Queue<byte[]> queue = channelContext.dataQueue(); + if (!queue.isEmpty()) { + byte[] data; + while ((data = queue.poll()) != null) { + write(data); + } + } + } + private void write(byte[] data) { write(data, 0, data.length); } - public abstract void write(byte[] data, int offset, int length); + protected abstract void write(byte[] data, int offset, int length); + + private void doRead() { + buffer.clear(); + + int totalReaded = 0; + try { + read(buffer); + } catch (IOException e) { + doCatchException(channelContext, e); + doClose(); + return; + } + + + if (logger.isDebugEnabled()) { + logger.debug("Readed {}B ({} -> {})", totalReaded, + channelContext.getLocalAddress(), channelContext.getRemoteAddress()); + } + + if (totalReaded > 0) { + try { + handler.dataReceived(channelContext, buffer); + logger.debug("Read handled."); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + } + } + + protected abstract int read(ByteBuff buffer) throws IOException; + + private void doClose() { + if (logger.isDebugEnabled()) { + logger.debug("Closing session '" + channelContext.getSessionInfo() + "'."); + } + + close(); + if (listener != null) { + listener.onConnectionClosed(channelContext); + } + + try { + handler.channelInactive(channelContext); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + try { + handler.channelUnregistered(channelContext); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + if (logger.isDebugEnabled()) { + logger.debug("Closed session '" + channelContext.getSessionInfo() + "'."); + } + + if (collectMetrics) { + metric.incClosedConnections(); + } + + working = false; + } @Override public void run() { @@ -119,21 +197,14 @@ case Task.CONNECT: connect(); break; + case Task.READ: + doRead(); + break; case Task.FLUSH: - Queue<byte[]> queue = channelContext.dataQueue(); - if (!queue.isEmpty()) { - byte[] data; - while ((data = queue.poll()) != null) { - write(data); - } - } + doWrite(); break; case Task.CLOSE: - close(); - if (listener != null) { - listener.onConnectionClosed(channelContext); - } - working = false; + doClose(); break; } } @@ -144,8 +215,9 @@ protected static class Task { public static final int CONNECT = 1; - public static final int FLUSH = 2; - public static final int CLOSE = 3; + public static final int READ = 2; + public static final int FLUSH = 3; + public static final int CLOSE = 4; final int code; @@ -156,6 +228,7 @@ } static final Task CONNECT_TASK = new Task(Task.CONNECT); + static final Task READ_TASK = new Task(Task.READ); static final Task FLUSH_TASK = new Task(Task.FLUSH); static final Task CLOSE_TASK = new Task(Task.CLOSE);
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java Wed Sep 04 08:59:30 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java Thu Sep 05 08:04:42 2019 +0200 @@ -1,5 +1,6 @@ package com.passus.st.emitter.socket; +import com.passus.data.ByteBuff; import com.passus.net.SocketAddress; import com.passus.st.emitter.EmitterHandler; import com.passus.st.emitter.SessionInfo; @@ -8,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.SocketException; @@ -20,10 +22,18 @@ private final Logger logger = LogManager.getLogger(DatagramConnection.class); + public static final int DEFAULT_BUFFER_SIZE = 64 * 1024; + private DatagramSocket socket; + private int bufferSize = DEFAULT_BUFFER_SIZE; + private java.net.SocketAddress remoteSocket; + private byte[] buffer = new byte[bufferSize]; + + private DatagramPacket readPacket = new DatagramPacket(buffer, buffer.length); + public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, boolean collectMetrics, ConnectionListener listener) { super(sessionInfo, handler, sessionMapper, listener, collectMetrics); } @@ -91,13 +101,18 @@ } - try { - handler.channelInactive(channelContext); - } catch (Exception e) { - logger.debug(e.getMessage(), e); + if (collectMetrics) { + metric.incClosedConnections(); } } + @Override + protected int read(ByteBuff buffer) throws IOException { + socket.receive(readPacket); + buffer.append(readPacket.getData(), readPacket.getOffset(), readPacket.getLength()); + return readPacket.getLength(); + } + public void write(byte[] data, int offset, int length) { try { handler.dataWriteStart(channelContext);
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Wed Sep 04 08:59:30 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Thu Sep 05 08:04:42 2019 +0200 @@ -1,6 +1,5 @@ package com.passus.st.emitter.socket; -import com.passus.commons.Assert; import com.passus.commons.annotations.Plugin; import com.passus.net.session.Session; import com.passus.st.emitter.*; @@ -38,7 +37,6 @@ @Override public void setSessionMapper(SessionMapper sessionMapper) { - Assert.notNull(sessionMapper, "sessionMapper"); this.sessionMapper = sessionMapper; } @@ -116,7 +114,6 @@ } } - @Override public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException { synchronized (this) { @@ -131,6 +128,12 @@ @Override public void onConnectionClosed(ChannelContext channelContext) { synchronized (SocketEmitter.this) { + try { + handler.channelInactive(channelContext); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + connections.remove(channelContext.getSessionInfo()); try { @@ -159,5 +162,4 @@ } - }