changeset 991:1d5d4cfd88fb

SocketEmitter in progress
author Devel 2
date Thu, 05 Sep 2019 11:36:59 +0200
parents ba4568bda0a3
children 2e7e654f2ad5
files stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java
diffstat 6 files changed, 244 insertions(+), 47 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java	Thu Sep 05 08:04:42 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java	Thu Sep 05 11:36:59 2019 +0200
@@ -14,7 +14,7 @@
 
 public abstract class AbstractChannelContext<K> implements ChannelContext<K> {
 
-    private final Connection connection;
+    protected final Connection connection;
 
     private boolean bidirectional;
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Thu Sep 05 08:04:42 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Thu Sep 05 11:36:59 2019 +0200
@@ -46,7 +46,7 @@
 
     ByteBuff buffer = new HeapByteBuff();
 
-    public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, ConnectionListener listener, boolean collectMetrics) {
+    public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, boolean collectMetrics, ConnectionListener listener) {
         this.sessionInfo = sessionInfo;
         this.handler = handler;
         this.sessionMapper = sessionMapper;
@@ -56,6 +56,8 @@
 
     public abstract boolean isConnected();
 
+    public abstract boolean isConnectionPending();
+
     protected final ConnectionParams getConnParams(SessionInfo sessionInfo, EmitterHandler handler) {
         ConnectionParams connParams = sessionMapper.map(sessionInfo);
         if (connParams == null) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Thu Sep 05 08:04:42 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Thu Sep 05 11:36:59 2019 +0200
@@ -20,36 +20,43 @@
 
 public class DatagramConnection extends Connection {
 
-    private final Logger logger = LogManager.getLogger(DatagramConnection.class);
+    private static final Logger LOGGER = LogManager.getLogger(DatagramConnection.class);
 
     public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
 
     private DatagramSocket socket;
 
+    private java.net.SocketAddress remoteSocket;
+
     private int bufferSize = DEFAULT_BUFFER_SIZE;
 
-    private java.net.SocketAddress remoteSocket;
-
     private byte[] buffer = new byte[bufferSize];
 
     private DatagramPacket readPacket = new DatagramPacket(buffer, buffer.length);
 
-    public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, boolean collectMetrics, ConnectionListener listener) {
-        super(sessionInfo, handler, sessionMapper, listener, collectMetrics);
+    public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler,
+                              SessionMapper sessionMapper, boolean collectMetrics,
+                              ConnectionListener listener) {
+        super(sessionInfo, handler, sessionMapper, collectMetrics, listener);
     }
 
     public boolean isConnected() {
         return socket.isConnected();
     }
 
+    @Override
+    public boolean isConnectionPending() {
+        return false;
+    }
+
     public void connect() {
         ConnectionParams connParams = getConnParams(sessionInfo, handler);
         if (connParams == null) {
             return;
         }
 
-        if (logger.isDebugEnabled()) {
-            logger.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
         }
 
         try {
@@ -116,29 +123,29 @@
     public void write(byte[] data, int offset, int length) {
         try {
             handler.dataWriteStart(channelContext);
-            if (logger.isDebugEnabled()) {
-                logger.debug("Writing ({} -> {}).", channelContext.getLocalAddress(), channelContext.getRemoteAddress());
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Writing ({} -> {}).", channelContext.getLocalAddress(), channelContext.getRemoteAddress());
             }
 
-            DatagramPacket packet = new DatagramPacket(data, offset, data.length, remoteSocket);
+            DatagramPacket packet = new DatagramPacket(data, offset, length, remoteSocket);
             socket.send(packet);
 
             if (collectMetrics) {
                 synchronized (metric) {
-                    metric.updateSentBytes(data.length - offset);
+                    metric.updateSentBytes(length);
                 }
             }
 
-            if (logger.isDebugEnabled()) {
-                logger.debug("Written {}B ({} -> {})", length - offset,
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Written {}B ({} -> {})", length,
                         channelContext.getLocalAddress(), channelContext.getRemoteAddress());
             }
 
             try {
                 handler.dataWritten(channelContext);
-                logger.debug("Write handled.");
+                LOGGER.debug("Write handled.");
             } catch (Exception e) {
-                logger.debug(e.getMessage(), e);
+                LOGGER.debug(e.getMessage(), e);
             }
         } catch (Exception ex) {
             doCatchException(channelContext, ex);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketChannelContext.java	Thu Sep 05 11:36:59 2019 +0200
@@ -0,0 +1,20 @@
+package com.passus.st.emitter.socket;
+
+public class SocketChannelContext<K> extends AbstractChannelContext<K> {
+
+    public SocketChannelContext(Connection connection) {
+        super(connection);
+    }
+
+    @Override
+    public boolean isConnected() {
+        return connection.isConnected();
+    }
+
+    @Override
+    public boolean isConnectionPending() {
+        return connection.isConnectionPending();
+    }
+
+
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java	Thu Sep 05 11:36:59 2019 +0200
@@ -0,0 +1,164 @@
+package com.passus.st.emitter.socket;
+
+import com.passus.data.ByteBuff;
+import com.passus.net.SocketAddress;
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.emitter.SessionMapper;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import static com.passus.net.utils.AddressUtils.jdkSocketToSocketAddress;
+import static com.passus.net.utils.AddressUtils.socketAddressToJdkSocket;
+import static com.passus.st.emitter.SessionMapper.ANY_SOCKET;
+
+public class SocketConnection extends Connection {
+
+    private static final Logger LOGGER = LogManager.getLogger(SocketConnection.class);
+
+    public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+
+    private Socket socket;
+
+    private boolean connectionPending = true;
+
+    private java.net.SocketAddress remoteSocket;
+
+    private InputStream in;
+
+    private OutputStream out;
+
+    private SocketAddress remoteAddress;
+
+    private int bufferSize = DEFAULT_BUFFER_SIZE;
+
+    private byte[] readBuffer = new byte[bufferSize];
+
+    public SocketConnection(SessionInfo sessionInfo, EmitterHandler handler,
+                            SessionMapper sessionMapper, boolean collectMetrics,
+                            ConnectionListener listener) {
+        super(sessionInfo, handler, sessionMapper, collectMetrics, listener);
+    }
+
+    @Override
+    public boolean isConnected() {
+        return socket.isConnected();
+    }
+
+    @Override
+    public boolean isConnectionPending() {
+        return connectionPending;
+    }
+
+    @Override
+    public void connect() {
+        SessionMapper.ConnectionParams connParams = getConnParams(sessionInfo, handler);
+        if (connParams == null) {
+            return;
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
+        }
+
+        try {
+            SocketAddress bindAddress = connParams.getBindAddress();
+            socket = new Socket();
+            if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
+                socket.bind(socketAddressToJdkSocket(bindAddress));
+            }
+
+
+        } catch (IOException ex) {
+            doCatchException(channelContext, ex);
+            return;
+        }
+
+        remoteAddress = connParams.getRemoteAddress();
+        if (remoteAddress == null) {
+            remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
+        }
+
+        java.net.SocketAddress localSocketAddress = socket.getLocalSocketAddress();
+        remoteSocket = socketAddressToJdkSocket(remoteAddress);
+        localAddress = jdkSocketToSocketAddress(localSocketAddress);
+
+        channelContext = new DatagramSocketChannelContext(this);
+        try {
+            handler.channelRegistered(channelContext);
+        } catch (Exception ex) {
+            doCatchException(channelContext, ex);
+        }
+
+        try {
+            socket.connect(remoteSocket);
+            in = socket.getInputStream();
+            out = socket.getOutputStream();
+            handler.channelActive(channelContext);
+        } catch (Exception ex) {
+            doCatchException(channelContext, ex);
+        }
+
+        connectionPending = false;
+    }
+
+    @Override
+    public void close() {
+        try {
+            socket.close();
+        } catch (Exception ignore) {
+
+        }
+
+        if (collectMetrics) {
+            metric.incClosedConnections();
+        }
+    }
+
+    @Override
+    protected void write(byte[] data, int offset, int length) {
+        try {
+            handler.dataWriteStart(channelContext);
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Writing ({} -> {}).", channelContext.getLocalAddress(), channelContext.getRemoteAddress());
+            }
+
+            out.write(data, offset, length);
+
+            if (collectMetrics) {
+                synchronized (metric) {
+                    metric.updateSentBytes(length);
+                }
+            }
+
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Written {}B ({} -> {})", length,
+                        channelContext.getLocalAddress(), channelContext.getRemoteAddress());
+            }
+
+            try {
+                handler.dataWritten(channelContext);
+                LOGGER.debug("Write handled.");
+            } catch (Exception e) {
+                LOGGER.debug(e.getMessage(), e);
+            }
+        } catch (Exception ex) {
+            doCatchException(channelContext, ex);
+        }
+    }
+
+    @Override
+    protected int read(ByteBuff buffer) throws IOException {
+        int readed = in.read(readBuffer);
+        if (readed > 0) {
+            buffer.append(readBuffer, 0, readed);
+        }
+
+        return readed;
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Thu Sep 05 08:04:42 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Thu Sep 05 11:36:59 2019 +0200
@@ -119,41 +119,45 @@
         synchronized (this) {
             checkStarted();
 
-            if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) {
-                throw new UnsupportedOperationException("Not implemented yet.");
-            } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) {
-                DatagramConnection connection = new DatagramConnection(sessionInfo, handler, sessionMapper, collectMetrics,
-                        new ConnectionListener() {
-
-                            @Override
-                            public void onConnectionClosed(ChannelContext channelContext) {
-                                synchronized (SocketEmitter.this) {
-                                    try {
-                                        handler.channelInactive(channelContext);
-                                    } catch (Exception e) {
-                                        LOGGER.debug(e.getMessage(), e);
-                                    }
-
-                                    connections.remove(channelContext.getSessionInfo());
+            ConnectionListener listener = new ConnectionListener() {
 
-                                    try {
-                                        handler.channelUnregistered(channelContext);
-                                    } catch (Exception e) {
-                                        LOGGER.debug(e.getMessage(), e);
-                                    }
+                @Override
+                public void onConnectionClosed(ChannelContext channelContext) {
+                    synchronized (SocketEmitter.this) {
+                        try {
+                            handler.channelInactive(channelContext);
+                        } catch (Exception e) {
+                            LOGGER.debug(e.getMessage(), e);
+                        }
 
-                                    if (LOGGER.isDebugEnabled()) {
-                                        LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'.");
-                                    }
-                                }
-                            }
+                        connections.remove(channelContext.getSessionInfo());
 
-                        });
+                        try {
+                            handler.channelUnregistered(channelContext);
+                        } catch (Exception e) {
+                            LOGGER.debug(e.getMessage(), e);
+                        }
 
-                connection.tasks.add(Connection.CONNECT_TASK);
-                connection.start();
-                connections.put(sessionInfo, connection);
+                        if (LOGGER.isDebugEnabled()) {
+                            LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'.");
+                        }
+                    }
+                }
+
+            };
+
+            Connection connection;
+            if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) {
+                connection = new SocketConnection(sessionInfo, handler, sessionMapper, collectMetrics, listener);
+            } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) {
+                connection = new DatagramConnection(sessionInfo, handler, sessionMapper, collectMetrics, listener);
+            } else {
+                throw new IllegalArgumentException("Not supported transport " + sessionInfo.getTransport() + ".");
             }
+
+            connection.tasks.add(Connection.CONNECT_TASK);
+            connection.start();
+            connections.put(sessionInfo, connection);
         }
     }