Mercurial > stress-tester
changeset 991:1d5d4cfd88fb
SocketEmitter in progress
author | Devel 2 |
---|---|
date | Thu, 05 Sep 2019 11:36:59 +0200 |
parents | ba4568bda0a3 |
children | 2e7e654f2ad5 |
files | stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java |
diffstat | 6 files changed, 244 insertions(+), 47 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java Thu Sep 05 08:04:42 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java Thu Sep 05 11:36:59 2019 +0200 @@ -14,7 +14,7 @@ public abstract class AbstractChannelContext<K> implements ChannelContext<K> { - private final Connection connection; + protected final Connection connection; private boolean bidirectional;
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Thu Sep 05 08:04:42 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Thu Sep 05 11:36:59 2019 +0200 @@ -46,7 +46,7 @@ ByteBuff buffer = new HeapByteBuff(); - public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, ConnectionListener listener, boolean collectMetrics) { + public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, boolean collectMetrics, ConnectionListener listener) { this.sessionInfo = sessionInfo; this.handler = handler; this.sessionMapper = sessionMapper; @@ -56,6 +56,8 @@ public abstract boolean isConnected(); + public abstract boolean isConnectionPending(); + protected final ConnectionParams getConnParams(SessionInfo sessionInfo, EmitterHandler handler) { ConnectionParams connParams = sessionMapper.map(sessionInfo); if (connParams == null) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java Thu Sep 05 08:04:42 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java Thu Sep 05 11:36:59 2019 +0200 @@ -20,36 +20,43 @@ public class DatagramConnection extends Connection { - private final Logger logger = LogManager.getLogger(DatagramConnection.class); + private static final Logger LOGGER = LogManager.getLogger(DatagramConnection.class); public static final int DEFAULT_BUFFER_SIZE = 64 * 1024; private DatagramSocket socket; + private java.net.SocketAddress remoteSocket; + private int bufferSize = DEFAULT_BUFFER_SIZE; - private java.net.SocketAddress remoteSocket; - private byte[] buffer = new byte[bufferSize]; private DatagramPacket readPacket = new DatagramPacket(buffer, buffer.length); - public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, boolean collectMetrics, ConnectionListener listener) { - super(sessionInfo, handler, sessionMapper, listener, collectMetrics); + public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, + SessionMapper sessionMapper, boolean collectMetrics, + ConnectionListener listener) { + super(sessionInfo, handler, sessionMapper, collectMetrics, listener); } public boolean isConnected() { return socket.isConnected(); } + @Override + public boolean isConnectionPending() { + return false; + } + public void connect() { ConnectionParams connParams = getConnParams(sessionInfo, handler); if (connParams == null) { return; } - if (logger.isDebugEnabled()) { - logger.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); } try { @@ -116,29 +123,29 @@ public void write(byte[] data, int offset, int length) { try { handler.dataWriteStart(channelContext); - if (logger.isDebugEnabled()) { - logger.debug("Writing ({} -> {}).", channelContext.getLocalAddress(), channelContext.getRemoteAddress()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Writing ({} -> {}).", channelContext.getLocalAddress(), channelContext.getRemoteAddress()); } - DatagramPacket packet = new DatagramPacket(data, offset, data.length, remoteSocket); + DatagramPacket packet = new DatagramPacket(data, offset, length, remoteSocket); socket.send(packet); if (collectMetrics) { synchronized (metric) { - metric.updateSentBytes(data.length - offset); + metric.updateSentBytes(length); } } - if (logger.isDebugEnabled()) { - logger.debug("Written {}B ({} -> {})", length - offset, + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Written {}B ({} -> {})", length, channelContext.getLocalAddress(), channelContext.getRemoteAddress()); } try { handler.dataWritten(channelContext); - logger.debug("Write handled."); + LOGGER.debug("Write handled."); } catch (Exception e) { - logger.debug(e.getMessage(), e); + LOGGER.debug(e.getMessage(), e); } } catch (Exception ex) { doCatchException(channelContext, ex);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketChannelContext.java Thu Sep 05 11:36:59 2019 +0200 @@ -0,0 +1,20 @@ +package com.passus.st.emitter.socket; + +public class SocketChannelContext<K> extends AbstractChannelContext<K> { + + public SocketChannelContext(Connection connection) { + super(connection); + } + + @Override + public boolean isConnected() { + return connection.isConnected(); + } + + @Override + public boolean isConnectionPending() { + return connection.isConnectionPending(); + } + + +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java Thu Sep 05 11:36:59 2019 +0200 @@ -0,0 +1,164 @@ +package com.passus.st.emitter.socket; + +import com.passus.data.ByteBuff; +import com.passus.net.SocketAddress; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.emitter.SessionMapper; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; + +import static com.passus.net.utils.AddressUtils.jdkSocketToSocketAddress; +import static com.passus.net.utils.AddressUtils.socketAddressToJdkSocket; +import static com.passus.st.emitter.SessionMapper.ANY_SOCKET; + +public class SocketConnection extends Connection { + + private static final Logger LOGGER = LogManager.getLogger(SocketConnection.class); + + public static final int DEFAULT_BUFFER_SIZE = 64 * 1024; + + private Socket socket; + + private boolean connectionPending = true; + + private java.net.SocketAddress remoteSocket; + + private InputStream in; + + private OutputStream out; + + private SocketAddress remoteAddress; + + private int bufferSize = DEFAULT_BUFFER_SIZE; + + private byte[] readBuffer = new byte[bufferSize]; + + public SocketConnection(SessionInfo sessionInfo, EmitterHandler handler, + SessionMapper sessionMapper, boolean collectMetrics, + ConnectionListener listener) { + super(sessionInfo, handler, sessionMapper, collectMetrics, listener); + } + + @Override + public boolean isConnected() { + return socket.isConnected(); + } + + @Override + public boolean isConnectionPending() { + return connectionPending; + } + + @Override + public void connect() { + SessionMapper.ConnectionParams connParams = getConnParams(sessionInfo, handler); + if (connParams == null) { + return; + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); + } + + try { + SocketAddress bindAddress = connParams.getBindAddress(); + socket = new Socket(); + if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) { + socket.bind(socketAddressToJdkSocket(bindAddress)); + } + + + } catch (IOException ex) { + doCatchException(channelContext, ex); + return; + } + + remoteAddress = connParams.getRemoteAddress(); + if (remoteAddress == null) { + remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); + } + + java.net.SocketAddress localSocketAddress = socket.getLocalSocketAddress(); + remoteSocket = socketAddressToJdkSocket(remoteAddress); + localAddress = jdkSocketToSocketAddress(localSocketAddress); + + channelContext = new DatagramSocketChannelContext(this); + try { + handler.channelRegistered(channelContext); + } catch (Exception ex) { + doCatchException(channelContext, ex); + } + + try { + socket.connect(remoteSocket); + in = socket.getInputStream(); + out = socket.getOutputStream(); + handler.channelActive(channelContext); + } catch (Exception ex) { + doCatchException(channelContext, ex); + } + + connectionPending = false; + } + + @Override + public void close() { + try { + socket.close(); + } catch (Exception ignore) { + + } + + if (collectMetrics) { + metric.incClosedConnections(); + } + } + + @Override + protected void write(byte[] data, int offset, int length) { + try { + handler.dataWriteStart(channelContext); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Writing ({} -> {}).", channelContext.getLocalAddress(), channelContext.getRemoteAddress()); + } + + out.write(data, offset, length); + + if (collectMetrics) { + synchronized (metric) { + metric.updateSentBytes(length); + } + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Written {}B ({} -> {})", length, + channelContext.getLocalAddress(), channelContext.getRemoteAddress()); + } + + try { + handler.dataWritten(channelContext); + LOGGER.debug("Write handled."); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } + } catch (Exception ex) { + doCatchException(channelContext, ex); + } + } + + @Override + protected int read(ByteBuff buffer) throws IOException { + int readed = in.read(readBuffer); + if (readed > 0) { + buffer.append(readBuffer, 0, readed); + } + + return readed; + } +}
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Thu Sep 05 08:04:42 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Thu Sep 05 11:36:59 2019 +0200 @@ -119,41 +119,45 @@ synchronized (this) { checkStarted(); - if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) { - throw new UnsupportedOperationException("Not implemented yet."); - } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) { - DatagramConnection connection = new DatagramConnection(sessionInfo, handler, sessionMapper, collectMetrics, - new ConnectionListener() { - - @Override - public void onConnectionClosed(ChannelContext channelContext) { - synchronized (SocketEmitter.this) { - try { - handler.channelInactive(channelContext); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } - - connections.remove(channelContext.getSessionInfo()); + ConnectionListener listener = new ConnectionListener() { - try { - handler.channelUnregistered(channelContext); - } catch (Exception e) { - LOGGER.debug(e.getMessage(), e); - } + @Override + public void onConnectionClosed(ChannelContext channelContext) { + synchronized (SocketEmitter.this) { + try { + handler.channelInactive(channelContext); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'."); - } - } - } + connections.remove(channelContext.getSessionInfo()); - }); + try { + handler.channelUnregistered(channelContext); + } catch (Exception e) { + LOGGER.debug(e.getMessage(), e); + } - connection.tasks.add(Connection.CONNECT_TASK); - connection.start(); - connections.put(sessionInfo, connection); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'."); + } + } + } + + }; + + Connection connection; + if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) { + connection = new SocketConnection(sessionInfo, handler, sessionMapper, collectMetrics, listener); + } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) { + connection = new DatagramConnection(sessionInfo, handler, sessionMapper, collectMetrics, listener); + } else { + throw new IllegalArgumentException("Not supported transport " + sessionInfo.getTransport() + "."); } + + connection.tasks.add(Connection.CONNECT_TASK); + connection.start(); + connections.put(sessionInfo, connection); } }