changeset 979:700a3005eaf8

SocketEmitter in progress
author Devel 2
date Mon, 29 Jul 2019 15:47:44 +0200
parents d88ea87ac0a7
children 8c03beba3161
files stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/ConnectionListener.java stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java
diffstat 4 files changed, 81 insertions(+), 13 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Mon Jul 29 12:54:31 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Mon Jul 29 15:47:44 2019 +0200
@@ -15,11 +15,11 @@
     protected final Logger logger = LogManager.getLogger(getClass());
 
     static final int FLAG_CONNECT = 1;
-    static final int FLAG_FLUSH = 2;
-    static final int FLAG_READ = 3;
-    static final int FLAG_CLOSE = 4;
+    static final int FLAG_FLUSH = 1 << 1;
+    static final int FLAG_READ = 1 << 2;
+    static final int FLAG_CLOSE = 1 << 3;
 
-    private boolean working = true;
+    boolean working = true;
 
     int flag = FLAG_CONNECT;
 
@@ -29,18 +29,23 @@
 
     final SessionMapper sessionMapper;
 
+    final ConnectionListener listener;
+
     SocketAddress localAddress;
 
     SocketAddress remoteAddress;
 
     AbstractChannelContext channelContext;
 
-    public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper) {
+    public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, ConnectionListener listener) {
         this.sessionInfo = sessionInfo;
         this.handler = handler;
         this.sessionMapper = sessionMapper;
+        this.listener = listener;
     }
 
+    public abstract boolean isConnected();
+
     protected final ConnectionParams getConnParams(SessionInfo sessionInfo, EmitterHandler handler) {
         ConnectionParams connParams = sessionMapper.map(sessionInfo);
         if (connParams == null) {
@@ -120,15 +125,21 @@
                             }
                         }
 
+                        flag = 0;
                         break;
                     case FLAG_READ:
                         break;
                     case FLAG_CLOSE:
                         close();
+                        if (listener != null) {
+                            listener.onConnectionClosed(channelContext);
+                        }
+                        working = false;
+                        flag = 0;
                         break;
-
                 }
             }
         }
+
     }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/ConnectionListener.java	Mon Jul 29 15:47:44 2019 +0200
@@ -0,0 +1,11 @@
+package com.passus.st.emitter.socket;
+
+import com.passus.st.emitter.ChannelContext;
+
+public interface ConnectionListener {
+
+    default void onConnectionClosed(ChannelContext channelContext) {
+
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Mon Jul 29 12:54:31 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Mon Jul 29 15:47:44 2019 +0200
@@ -24,8 +24,12 @@
 
     private java.net.SocketAddress remoteSocket;
 
-    public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper) {
-        super(sessionInfo, handler, sessionMapper);
+    public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, ConnectionListener listener) {
+        super(sessionInfo, handler, sessionMapper, listener);
+    }
+
+    public boolean isConnected() {
+        return socket.isConnected();
     }
 
     public void connect() {
@@ -81,11 +85,20 @@
 
         }
 
+
         try {
             socket.close();
         } catch (Exception ignore) {
 
         }
+
+        try {
+            handler.channelInactive(channelContext);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+
+
     }
 
     public void write(byte[] data, int offset, int length) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Mon Jul 29 12:54:31 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Mon Jul 29 15:47:44 2019 +0200
@@ -3,10 +3,7 @@
 import com.passus.commons.Assert;
 import com.passus.commons.annotations.Plugin;
 import com.passus.net.session.Session;
-import com.passus.st.emitter.Emitter;
-import com.passus.st.emitter.EmitterHandler;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.emitter.SessionMapper;
+import com.passus.st.emitter.*;
 import com.passus.st.metric.MetricsContainer;
 import com.passus.st.plugin.PluginConstants;
 import org.apache.logging.log4j.LogManager;
@@ -16,6 +13,8 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import static com.passus.st.emitter.socket.Connection.FLAG_CLOSE;
+
 @Plugin(name = SocketEmitter.TYPE, category = PluginConstants.CATEGORY_EMITTER)
 public class SocketEmitter implements Emitter {
 
@@ -65,6 +64,7 @@
         return started;
     }
 
+
     @Override
     public void start() {
         synchronized (this) {
@@ -87,6 +87,20 @@
                 return;
             }
 
+            connections.forEach((sessionInfo, conn) -> {
+                if (conn.isConnected()) {
+                    conn.flag = FLAG_CLOSE;
+                }
+
+                conn.working = false;
+                conn.interrupt();
+                try {
+                    conn.join(1_000);
+                } catch (InterruptedException ignore) {
+
+                }
+            });
+
             started = false;
         }
     }
@@ -109,7 +123,26 @@
             if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) {
                 throw new UnsupportedOperationException("Not implemented yet.");
             } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) {
-                DatagramConnection connection = new DatagramConnection(sessionInfo, handler, sessionMapper);
+                DatagramConnection connection = new DatagramConnection(sessionInfo, handler, sessionMapper, new ConnectionListener() {
+
+                    @Override
+                    public void onConnectionClosed(ChannelContext channelContext) {
+                        synchronized (SocketEmitter.this) {
+                            connections.remove(channelContext.getSessionInfo());
+
+                            try {
+                                handler.channelUnregistered(channelContext);
+                            } catch (Exception e) {
+                                LOGGER.debug(e.getMessage(), e);
+                            }
+
+                            if (LOGGER.isDebugEnabled()) {
+                                LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'.");
+                            }
+                        }
+                    }
+
+                });
                 connection.start();
                 connections.put(sessionInfo, connection);
             }