Mercurial > stress-tester
changeset 981:da4a9beebbd3
SocketEmitter in progress
author | Devel 2 |
---|---|
date | Wed, 31 Jul 2019 14:01:03 +0200 |
parents | 8c03beba3161 |
children | ff71b37f293c |
files | stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java |
diffstat | 4 files changed, 51 insertions(+), 37 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java Wed Jul 31 10:34:10 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java Wed Jul 31 14:01:03 2019 +0200 @@ -261,7 +261,7 @@ NioChannelContext channelContext = keyContext.channelContext; Queue<ByteBuffer> queue = channelContext.dataQueue(); - if(queue.isEmpty()) { + if (queue.isEmpty()) { return; } @@ -285,7 +285,9 @@ } if (collectMetrics) { - metric.updateSentBytes(res); + synchronized (metric) { + metric.updateSentBytes(res); + } } written += res;
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Wed Jul 31 10:34:10 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Wed Jul 31 14:01:03 2019 +0200 @@ -2,6 +2,7 @@ import com.passus.net.SocketAddress; import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.EmitterMetric; import com.passus.st.emitter.SessionInfo; import com.passus.st.emitter.SessionMapper; import com.passus.st.emitter.SessionMapper.ConnectionParams; @@ -11,7 +12,8 @@ import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; + +import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID; public abstract class Connection extends Thread { @@ -35,11 +37,16 @@ AbstractChannelContext channelContext; - public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, ConnectionListener listener) { + final boolean collectMetrics; + + EmitterMetric metric; + + public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, ConnectionListener listener, boolean collectMetrics) { this.sessionInfo = sessionInfo; this.handler = handler; this.sessionMapper = sessionMapper; this.listener = listener; + this.collectMetrics = collectMetrics; } public abstract boolean isConnected(); @@ -51,11 +58,11 @@ logger.debug("Unable to map session '{}'.", sessionInfo); } - /* if (collectMetrics) { + if (collectMetrics) { synchronized (metric) { metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID); } - }*/ + } try { handler.sessionInvalidated(sessionInfo); @@ -78,11 +85,11 @@ logger.debug("Error occurred. " + cause.getMessage(), cause); } - /*if (collectMetrics) { - synchronized (metric) { - metric.errorCaught(cause); - } - }*/ + if (collectMetrics) { + synchronized (metric) { + metric.errorCaught(cause); + } + } try { handler.errorOccurred(channelContext, cause); @@ -105,7 +112,7 @@ public void run() { while (working) { synchronized (this) { - Task task = tasks.poll(); + Task task = tasks.poll(); if (task != null) { switch (task.code) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java Wed Jul 31 10:34:10 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java Wed Jul 31 14:01:03 2019 +0200 @@ -24,8 +24,8 @@ private java.net.SocketAddress remoteSocket; - public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, ConnectionListener listener) { - super(sessionInfo, handler, sessionMapper, listener); + public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, boolean collectMetrics, ConnectionListener listener) { + super(sessionInfo, handler, sessionMapper, listener, collectMetrics); } public boolean isConnected() { @@ -85,7 +85,6 @@ } - try { socket.close(); } catch (Exception ignore) { @@ -97,8 +96,6 @@ } catch (Exception e) { logger.debug(e.getMessage(), e); } - - } public void write(byte[] data, int offset, int length) { @@ -111,8 +108,14 @@ DatagramPacket packet = new DatagramPacket(data, offset, data.length, remoteSocket); socket.send(packet); + if (collectMetrics) { + synchronized (metric) { + metric.updateSentBytes(data.length - offset); + } + } + if (logger.isDebugEnabled()) { - logger.debug("Written {}B ({} -> {})", length, + logger.debug("Written {}B ({} -> {})", length - offset, channelContext.getLocalAddress(), channelContext.getRemoteAddress()); }
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Wed Jul 31 10:34:10 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Wed Jul 31 14:01:03 2019 +0200 @@ -62,6 +62,10 @@ return started; } + public int getConnectionsNumber() { + return connections.size(); + } + @Override public void start() { synchronized (this) { @@ -117,33 +121,31 @@ public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException { synchronized (this) { checkStarted(); - if (connections.containsKey(sessionInfo)) { - throw new IOException("Connection established already."); - } if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) { throw new UnsupportedOperationException("Not implemented yet."); } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) { - DatagramConnection connection = new DatagramConnection(sessionInfo, handler, sessionMapper, new ConnectionListener() { + DatagramConnection connection = new DatagramConnection(sessionInfo, handler, sessionMapper, collectMetrics, + new ConnectionListener() { - @Override - public void onConnectionClosed(ChannelContext channelContext) { - synchronized (SocketEmitter.this) { - connections.remove(channelContext.getSessionInfo()); + @Override + public void onConnectionClosed(ChannelContext channelContext) { + synchronized (SocketEmitter.this) { + connections.remove(channelContext.getSessionInfo()); - try { - handler.channelUnregistered(channelContext); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); + try { + handler.channelUnregistered(channelContext); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'."); + } + } } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'."); - } - } - } - - }); + }); connection.tasks.add(Connection.CONNECT_TASK); connection.start();