Mercurial > stress-tester
changeset 979:700a3005eaf8
SocketEmitter in progress
author | Devel 2 |
---|---|
date | Mon, 29 Jul 2019 15:47:44 +0200 |
parents | d88ea87ac0a7 |
children | 8c03beba3161 |
files | stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/ConnectionListener.java stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java |
diffstat | 4 files changed, 81 insertions(+), 13 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Mon Jul 29 12:54:31 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Mon Jul 29 15:47:44 2019 +0200 @@ -15,11 +15,11 @@ protected final Logger logger = LogManager.getLogger(getClass()); static final int FLAG_CONNECT = 1; - static final int FLAG_FLUSH = 2; - static final int FLAG_READ = 3; - static final int FLAG_CLOSE = 4; + static final int FLAG_FLUSH = 1 << 1; + static final int FLAG_READ = 1 << 2; + static final int FLAG_CLOSE = 1 << 3; - private boolean working = true; + boolean working = true; int flag = FLAG_CONNECT; @@ -29,18 +29,23 @@ final SessionMapper sessionMapper; + final ConnectionListener listener; + SocketAddress localAddress; SocketAddress remoteAddress; AbstractChannelContext channelContext; - public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper) { + public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, ConnectionListener listener) { this.sessionInfo = sessionInfo; this.handler = handler; this.sessionMapper = sessionMapper; + this.listener = listener; } + public abstract boolean isConnected(); + protected final ConnectionParams getConnParams(SessionInfo sessionInfo, EmitterHandler handler) { ConnectionParams connParams = sessionMapper.map(sessionInfo); if (connParams == null) { @@ -120,15 +125,21 @@ } } + flag = 0; break; case FLAG_READ: break; case FLAG_CLOSE: close(); + if (listener != null) { + listener.onConnectionClosed(channelContext); + } + working = false; + flag = 0; break; - } } } + } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/ConnectionListener.java Mon Jul 29 15:47:44 2019 +0200 @@ -0,0 +1,11 @@ +package com.passus.st.emitter.socket; + +import com.passus.st.emitter.ChannelContext; + +public interface ConnectionListener { + + default void onConnectionClosed(ChannelContext channelContext) { + + } + +}
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java Mon Jul 29 12:54:31 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java Mon Jul 29 15:47:44 2019 +0200 @@ -24,8 +24,12 @@ private java.net.SocketAddress remoteSocket; - public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper) { - super(sessionInfo, handler, sessionMapper); + public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, ConnectionListener listener) { + super(sessionInfo, handler, sessionMapper, listener); + } + + public boolean isConnected() { + return socket.isConnected(); } public void connect() { @@ -81,11 +85,20 @@ } + try { socket.close(); } catch (Exception ignore) { } + + try { + handler.channelInactive(channelContext); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + } public void write(byte[] data, int offset, int length) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Mon Jul 29 12:54:31 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Mon Jul 29 15:47:44 2019 +0200 @@ -3,10 +3,7 @@ import com.passus.commons.Assert; import com.passus.commons.annotations.Plugin; import com.passus.net.session.Session; -import com.passus.st.emitter.Emitter; -import com.passus.st.emitter.EmitterHandler; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.emitter.SessionMapper; +import com.passus.st.emitter.*; import com.passus.st.metric.MetricsContainer; import com.passus.st.plugin.PluginConstants; import org.apache.logging.log4j.LogManager; @@ -16,6 +13,8 @@ import java.util.HashMap; import java.util.Map; +import static com.passus.st.emitter.socket.Connection.FLAG_CLOSE; + @Plugin(name = SocketEmitter.TYPE, category = PluginConstants.CATEGORY_EMITTER) public class SocketEmitter implements Emitter { @@ -65,6 +64,7 @@ return started; } + @Override public void start() { synchronized (this) { @@ -87,6 +87,20 @@ return; } + connections.forEach((sessionInfo, conn) -> { + if (conn.isConnected()) { + conn.flag = FLAG_CLOSE; + } + + conn.working = false; + conn.interrupt(); + try { + conn.join(1_000); + } catch (InterruptedException ignore) { + + } + }); + started = false; } } @@ -109,7 +123,26 @@ if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) { throw new UnsupportedOperationException("Not implemented yet."); } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) { - DatagramConnection connection = new DatagramConnection(sessionInfo, handler, sessionMapper); + DatagramConnection connection = new DatagramConnection(sessionInfo, handler, sessionMapper, new ConnectionListener() { + + @Override + public void onConnectionClosed(ChannelContext channelContext) { + synchronized (SocketEmitter.this) { + connections.remove(channelContext.getSessionInfo()); + + try { + handler.channelUnregistered(channelContext); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'."); + } + } + } + + }); connection.start(); connections.put(sessionInfo, connection); }