changeset 981:da4a9beebbd3

SocketEmitter in progress
author Devel 2
date Wed, 31 Jul 2019 14:01:03 +0200
parents 8c03beba3161
children ff71b37f293c
files stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java
diffstat 4 files changed, 51 insertions(+), 37 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Wed Jul 31 10:34:10 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Wed Jul 31 14:01:03 2019 +0200
@@ -261,7 +261,7 @@
 
         NioChannelContext channelContext = keyContext.channelContext;
         Queue<ByteBuffer> queue = channelContext.dataQueue();
-        if(queue.isEmpty()) {
+        if (queue.isEmpty()) {
             return;
         }
 
@@ -285,7 +285,9 @@
                     }
 
                     if (collectMetrics) {
-                        metric.updateSentBytes(res);
+                        synchronized (metric) {
+                            metric.updateSentBytes(res);
+                        }
                     }
 
                     written += res;
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Wed Jul 31 10:34:10 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Wed Jul 31 14:01:03 2019 +0200
@@ -2,6 +2,7 @@
 
 import com.passus.net.SocketAddress;
 import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.EmitterMetric;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.emitter.SessionMapper;
 import com.passus.st.emitter.SessionMapper.ConnectionParams;
@@ -11,7 +12,8 @@
 import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+
+import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID;
 
 public abstract class Connection extends Thread {
 
@@ -35,11 +37,16 @@
 
     AbstractChannelContext channelContext;
 
-    public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, ConnectionListener listener) {
+    final boolean collectMetrics;
+
+    EmitterMetric metric;
+
+    public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, ConnectionListener listener, boolean collectMetrics) {
         this.sessionInfo = sessionInfo;
         this.handler = handler;
         this.sessionMapper = sessionMapper;
         this.listener = listener;
+        this.collectMetrics = collectMetrics;
     }
 
     public abstract boolean isConnected();
@@ -51,11 +58,11 @@
                 logger.debug("Unable to map session '{}'.", sessionInfo);
             }
 
-           /* if (collectMetrics) {
+            if (collectMetrics) {
                 synchronized (metric) {
                     metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID);
                 }
-            }*/
+            }
 
             try {
                 handler.sessionInvalidated(sessionInfo);
@@ -78,11 +85,11 @@
             logger.debug("Error occurred. " + cause.getMessage(), cause);
         }
 
-        /*if (collectMetrics) {
-                synchronized (metric) {
-                    metric.errorCaught(cause);
-                }
-        }*/
+        if (collectMetrics) {
+            synchronized (metric) {
+                metric.errorCaught(cause);
+            }
+        }
 
         try {
             handler.errorOccurred(channelContext, cause);
@@ -105,7 +112,7 @@
     public void run() {
         while (working) {
             synchronized (this) {
-                Task task  = tasks.poll();
+                Task task = tasks.poll();
 
                 if (task != null) {
                     switch (task.code) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Wed Jul 31 10:34:10 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Wed Jul 31 14:01:03 2019 +0200
@@ -24,8 +24,8 @@
 
     private java.net.SocketAddress remoteSocket;
 
-    public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, ConnectionListener listener) {
-        super(sessionInfo, handler, sessionMapper, listener);
+    public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper, boolean collectMetrics, ConnectionListener listener) {
+        super(sessionInfo, handler, sessionMapper, listener, collectMetrics);
     }
 
     public boolean isConnected() {
@@ -85,7 +85,6 @@
 
         }
 
-
         try {
             socket.close();
         } catch (Exception ignore) {
@@ -97,8 +96,6 @@
         } catch (Exception e) {
             logger.debug(e.getMessage(), e);
         }
-
-
     }
 
     public void write(byte[] data, int offset, int length) {
@@ -111,8 +108,14 @@
             DatagramPacket packet = new DatagramPacket(data, offset, data.length, remoteSocket);
             socket.send(packet);
 
+            if (collectMetrics) {
+                synchronized (metric) {
+                    metric.updateSentBytes(data.length - offset);
+                }
+            }
+
             if (logger.isDebugEnabled()) {
-                logger.debug("Written {}B ({} -> {})", length,
+                logger.debug("Written {}B ({} -> {})", length - offset,
                         channelContext.getLocalAddress(), channelContext.getRemoteAddress());
             }
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Wed Jul 31 10:34:10 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java	Wed Jul 31 14:01:03 2019 +0200
@@ -62,6 +62,10 @@
         return started;
     }
 
+    public int getConnectionsNumber() {
+        return connections.size();
+    }
+
     @Override
     public void start() {
         synchronized (this) {
@@ -117,33 +121,31 @@
     public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException {
         synchronized (this) {
             checkStarted();
-            if (connections.containsKey(sessionInfo)) {
-                throw new IOException("Connection established already.");
-            }
 
             if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) {
                 throw new UnsupportedOperationException("Not implemented yet.");
             } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) {
-                DatagramConnection connection = new DatagramConnection(sessionInfo, handler, sessionMapper, new ConnectionListener() {
+                DatagramConnection connection = new DatagramConnection(sessionInfo, handler, sessionMapper, collectMetrics,
+                        new ConnectionListener() {
 
-                    @Override
-                    public void onConnectionClosed(ChannelContext channelContext) {
-                        synchronized (SocketEmitter.this) {
-                            connections.remove(channelContext.getSessionInfo());
+                            @Override
+                            public void onConnectionClosed(ChannelContext channelContext) {
+                                synchronized (SocketEmitter.this) {
+                                    connections.remove(channelContext.getSessionInfo());
 
-                            try {
-                                handler.channelUnregistered(channelContext);
-                            } catch (Exception e) {
-                                LOGGER.debug(e.getMessage(), e);
+                                    try {
+                                        handler.channelUnregistered(channelContext);
+                                    } catch (Exception e) {
+                                        LOGGER.debug(e.getMessage(), e);
+                                    }
+
+                                    if (LOGGER.isDebugEnabled()) {
+                                        LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'.");
+                                    }
+                                }
                             }
 
-                            if (LOGGER.isDebugEnabled()) {
-                                LOGGER.debug("Closed session '" + channelContext.getSessionInfo() + "'.");
-                            }
-                        }
-                    }
-
-                });
+                        });
 
                 connection.tasks.add(Connection.CONNECT_TASK);
                 connection.start();