changeset 990:ba4568bda0a3

SocketEmitter in progress
author Devel 2
date Thu, 05 Sep 2019 08:04:42 +0200
parents 6414bc0350bb
children 1d5d4cfd88fb
files stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java
diffstat 3 files changed, 113 insertions(+), 23 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Wed Sep 04 08:59:30 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Thu Sep 05 08:04:42 2019 +0200
@@ -1,5 +1,7 @@
 package com.passus.st.emitter.socket;
 
+import com.passus.data.ByteBuff;
+import com.passus.data.HeapByteBuff;
 import com.passus.net.SocketAddress;
 import com.passus.st.emitter.EmitterHandler;
 import com.passus.st.emitter.EmitterMetric;
@@ -9,6 +11,7 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
 import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -41,6 +44,8 @@
 
     EmitterMetric metric;
 
+    ByteBuff buffer = new HeapByteBuff();
+
     public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, ConnectionListener listener, boolean collectMetrics) {
         this.sessionInfo = sessionInfo;
         this.handler = handler;
@@ -102,11 +107,84 @@
 
     public abstract void close();
 
+    private void doWrite() {
+        Queue<byte[]> queue = channelContext.dataQueue();
+        if (!queue.isEmpty()) {
+            byte[] data;
+            while ((data = queue.poll()) != null) {
+                write(data);
+            }
+        }
+    }
+
     private void write(byte[] data) {
         write(data, 0, data.length);
     }
 
-    public abstract void write(byte[] data, int offset, int length);
+    protected abstract void write(byte[] data, int offset, int length);
+
+    private void doRead() {
+        buffer.clear();
+
+        int totalReaded = 0;
+        try {
+            read(buffer);
+        } catch (IOException e) {
+            doCatchException(channelContext, e);
+            doClose();
+            return;
+        }
+
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Readed {}B ({} -> {})", totalReaded,
+                    channelContext.getLocalAddress(), channelContext.getRemoteAddress());
+        }
+
+        if (totalReaded > 0) {
+            try {
+                handler.dataReceived(channelContext, buffer);
+                logger.debug("Read handled.");
+            } catch (Exception e) {
+                logger.debug(e.getMessage(), e);
+            }
+        }
+    }
+
+    protected abstract int read(ByteBuff buffer) throws IOException;
+
+    private void doClose() {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Closing session '" + channelContext.getSessionInfo() + "'.");
+        }
+
+        close();
+        if (listener != null) {
+            listener.onConnectionClosed(channelContext);
+        }
+
+        try {
+            handler.channelInactive(channelContext);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+
+        try {
+            handler.channelUnregistered(channelContext);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Closed session '" + channelContext.getSessionInfo() + "'.");
+        }
+
+        if (collectMetrics) {
+            metric.incClosedConnections();
+        }
+
+        working = false;
+    }
 
     @Override
     public void run() {
@@ -119,21 +197,14 @@
                         case Task.CONNECT:
                             connect();
                             break;
+                        case Task.READ:
+                            doRead();
+                            break;
                         case Task.FLUSH:
-                            Queue<byte[]> queue = channelContext.dataQueue();
-                            if (!queue.isEmpty()) {
-                                byte[] data;
-                                while ((data = queue.poll()) != null) {
-                                    write(data);
-                                }
-                            }
+                            doWrite();
                             break;
                         case Task.CLOSE:
-                            close();
-                            if (listener != null) {
-                                listener.onConnectionClosed(channelContext);
-                            }
-                            working = false;
+                            doClose();
                             break;
                     }
                 }
@@ -144,8 +215,9 @@
     protected static class Task {
 
         public static final int CONNECT = 1;
-        public static final int FLUSH = 2;
-        public static final int CLOSE = 3;
+        public static final int READ = 2;
+        public static final int FLUSH = 3;
+        public static final int CLOSE = 4;
 
         final int code;
 
@@ -156,6 +228,7 @@
     }
 
     static final Task CONNECT_TASK = new Task(Task.CONNECT);
+    static final Task READ_TASK = new Task(Task.READ);
     static final Task FLUSH_TASK = new Task(Task.FLUSH);
     static final Task CLOSE_TASK = new Task(Task.CLOSE);
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Wed Sep 04 08:59:30 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Thu Sep 05 08:04:42 2019 +0200
@@ -1,5 +1,6 @@
 package com.passus.st.emitter.socket;
 
+import com.passus.data.ByteBuff;
 import com.passus.net.SocketAddress;
 import com.passus.st.emitter.EmitterHandler;
 import com.passus.st.emitter.SessionInfo;
@@ -8,6 +9,7 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.SocketException;
@@ -20,10 +22,18 @@
 
     private final Logger logger = LogManager.getLogger(DatagramConnection.class);
 
+    public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+
     private DatagramSocket socket;
 
+    private int bufferSize = DEFAULT_BUFFER_SIZE;
+
     private java.net.SocketAddress remoteSocket;
 
+    private byte[] buffer = new byte[bufferSize];
+
+    private DatagramPacket readPacket = new DatagramPacket(buffer, buffer.length);
+
     public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, boolean collectMetrics, ConnectionListener listener) {
         super(sessionInfo, handler, sessionMapper, listener, collectMetrics);
     }
@@ -91,13 +101,18 @@
 
         }
 
-        try {
-            handler.channelInactive(channelContext);
-        } catch (Exception e) {
-            logger.debug(e.getMessage(), e);
+        if (collectMetrics) {
+            metric.incClosedConnections();
         }
     }
 
+    @Override
+    protected int read(ByteBuff buffer) throws IOException {
+        socket.receive(readPacket);
+        buffer.append(readPacket.getData(), readPacket.getOffset(), readPacket.getLength());
+        return readPacket.getLength();
+    }
+
     public void write(byte[] data, int offset, int length) {
         try {
             handler.dataWriteStart(channelContext);
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Wed Sep 04 08:59:30 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Thu Sep 05 08:04:42 2019 +0200
@@ -1,6 +1,5 @@
 package com.passus.st.emitter.socket;
 
-import com.passus.commons.Assert;
 import com.passus.commons.annotations.Plugin;
 import com.passus.net.session.Session;
 import com.passus.st.emitter.*;
@@ -38,7 +37,6 @@
 
     @Override
     public void setSessionMapper(SessionMapper sessionMapper) {
-        Assert.notNull(sessionMapper, "sessionMapper");
         this.sessionMapper = sessionMapper;
     }
 
@@ -116,7 +114,6 @@
         }
     }
 
-
     @Override
     public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException {
         synchronized (this) {
@@ -131,6 +128,12 @@
                             @Override
                             public void onConnectionClosed(ChannelContext channelContext) {
                                 synchronized (SocketEmitter.this) {
+                                    try {
+                                        handler.channelInactive(channelContext);
+                                    } catch (Exception e) {
+                                        LOGGER.debug(e.getMessage(), e);
+                                    }
+
                                     connections.remove(channelContext.getSessionInfo());
 
                                     try {
@@ -159,5 +162,4 @@
 
     }
 
-
 }