changeset 1147:2d394f066c80

EmitterMetric.bindedAddresses
author Devel 2
date Fri, 12 Jun 2020 10:35:10 +0200
parents c25812252c9d
children 2a19b0b893d9
files stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java
diffstat 5 files changed, 44 insertions(+), 11 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java	Fri Jun 12 10:34:19 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java	Fri Jun 12 10:35:10 2020 +0200
@@ -2,6 +2,7 @@
 
 import com.passus.commons.Assert;
 import com.passus.commons.metric.Metric;
+import com.passus.net.IpAddress;
 import com.passus.net.SocketAddress;
 import com.passus.st.utils.NetExceptionsCategory;
 import org.apache.commons.lang3.mutable.MutableInt;
@@ -37,7 +38,8 @@
 
     private final Map<SocketAddress, MutableInt> remoteSocketConnections = new HashMap<>();
 
-    //private final Set<SocketAddress> bindedSockets = new HashSet<>();
+    private final Map<IpAddress, MutableInt> bindedAddresses = new HashMap<>();
+
     private final Map<String, MutableInt> errors = new HashMap<>();
 
     private boolean active = false;
@@ -58,7 +60,7 @@
         attrs.put("remoteSocketConnections", (Serializable) remoteSocketConnections);
         attrs.put("errors", (Serializable) errors);
         attrs.put("dropped", dropped);
-        //attrs.put("bindedSockets", (Serializable) bindedSockets);
+        attrs.put("bindedAddresses", (Serializable) bindedAddresses);
     }
 
     @Override
@@ -116,11 +118,20 @@
 
     public void addRemoteSocketConnection(SocketAddress address) {
         establishedConnections.increment();
-        MutableInt conns = remoteSocketConnections.get(address);
-        if (conns == null) {
+        MutableInt num = remoteSocketConnections.get(address);
+        if (num == null) {
             remoteSocketConnections.put(address, new MutableInt(1));
         } else {
-            conns.increment();
+            num.increment();
+        }
+    }
+
+    public void addBindAddress(IpAddress ip) {
+        MutableInt num = bindedAddresses.get(ip);
+        if (num == null) {
+            bindedAddresses.put(ip, new MutableInt(1));
+        } else {
+            num.increment();
         }
     }
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Fri Jun 12 10:34:19 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Fri Jun 12 10:35:10 2020 +0200
@@ -128,7 +128,9 @@
             doCatchException(key, e);
             key.cancel();
             if (collectMetrics) {
-                metric.incConnectionsErrors();
+                synchronized (metric) {
+                    metric.incConnectionsErrors();
+                }
             }
 
             return;
@@ -136,8 +138,10 @@
 
         try {
             if (collectMetrics) {
-                metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress());
-                metric.addBindSocket(keyContext.channelContext.getLocalAddress());
+                synchronized (metric) {
+                    metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress());
+                    metric.addBindAddress(keyContext.channelContext.getLocalAddress().getIp());
+                }
             }
 
             if (logger.isDebugEnabled()) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java	Fri Jun 12 10:34:19 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java	Fri Jun 12 10:35:10 2020 +0200
@@ -98,7 +98,7 @@
             socketParameters.apply(channel);
 
             SocketAddress bindAddress = connParams.getBindAddress();
-            if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
+            if (bindAddress != null && !ANY_SOCKET.equals(bindAddress)) {
                 channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress));
             }
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Fri Jun 12 10:34:19 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java	Fri Jun 12 10:35:10 2020 +0200
@@ -63,8 +63,14 @@
 
         try {
             SocketAddress bindAddress = connParams.getBindAddress();
-            if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
+            if (bindAddress != null && !ANY_SOCKET.equals(bindAddress)) {
                 socket = new DatagramSocket(socketAddressToJdkSocket(bindAddress));
+
+                if (collectMetrics) {
+                    synchronized (metric) {
+                        metric.addBindAddress(bindAddress.getIp());
+                    }
+                }
             } else {
                 socket = new DatagramSocket();
             }
@@ -83,6 +89,11 @@
         java.net.SocketAddress localSocketAddress = socket.getLocalSocketAddress();
         remoteSocket = socketAddressToJdkSocket(remoteAddress);
         localAddress = jdkSocketToSocketAddress(localSocketAddress);
+        if (collectMetrics) {
+            synchronized (metric) {
+                metric.addBindAddress(localAddress.getIp());
+            }
+        }
 
         channelContext = new DatagramSocketChannelContext(this);
         try {
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java	Fri Jun 12 10:34:19 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketConnection.java	Fri Jun 12 10:35:10 2020 +0200
@@ -80,8 +80,10 @@
                 socket = new Socket();
             }
 
-            if (bindAddress != null && !ANY_SOCKET.equals(bindAddress)) {
+            if (bindAddress != null) {
                 socket.bind(socketAddressToJdkSocket(bindAddress));
+
+
             }
 
             socketParameters.apply(socket);
@@ -98,6 +100,11 @@
         java.net.SocketAddress localSocketAddress = socket.getLocalSocketAddress();
         remoteSocket = socketAddressToJdkSocket(remoteAddress);
         localAddress = jdkSocketToSocketAddress(localSocketAddress);
+        if (collectMetrics) {
+            synchronized (metric) {
+                metric.addBindAddress(localAddress.getIp());
+            }
+        }
 
         channelContext = new DatagramSocketChannelContext(this);
         try {