changeset 518:3ca6df6dd9cb

ST-70
author Devel 2
date Tue, 22 Aug 2017 15:03:36 +0200
parents 36d1920e57f0
children 7178dc5af601
files stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/RuleBasedSessionMapper.java stress-tester/src/main/java/com/passus/st/emitter/SessionMapper.java stress-tester/src/main/java/com/passus/st/emitter/SimpleSocketMatcher.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java stress-tester/src/test/java/com/passus/st/emitter/RuleBasedSessionMapperTest.java
diffstat 7 files changed, 84 insertions(+), 107 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java	Mon Aug 21 17:13:41 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java	Tue Aug 22 15:03:36 2017 +0200
@@ -11,6 +11,10 @@
  */
 public interface ChannelContext {
 
+    public boolean isConnected();
+
+    public boolean isConnectionPending();
+
     public void write(byte[] data, int offset, int length) throws IOException;
 
     public void write(ByteBuff data) throws IOException;
--- a/stress-tester/src/main/java/com/passus/st/emitter/RuleBasedSessionMapper.java	Mon Aug 21 17:13:41 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/RuleBasedSessionMapper.java	Tue Aug 22 15:03:36 2017 +0200
@@ -43,66 +43,7 @@
      */
     public void addRule(String rule) throws ParseException {
         Assert.notEmpty(rule, "rule");
-        int pos = rule.indexOf(RULE_SOCKET_SEPARATOR);
-        if (pos == -1) {
-            throw new ParseException("Invalid rule", 0);
-        }
-
-        String orgAddrStr = rule.substring(0, pos).trim();
-        SocketMatcher matcher;
-        SocketAddress remoteAddress;
-        SocketAddressProvider bindAddressProvider = SocketAddressProviderBase.ANY_SOCKET_PROVIDER;
-
-        try {
-            String[] parts = StringUtils.split(orgAddrStr, ":");
-            if (parts.length != 2) {
-                throw new Exception();
-            }
-
-            IpAddress ip = null;
-            if (!"*".equals(parts[0])) {
-                ip = IpAddress.parse(parts[0]);
-            }
-
-            int port = -1;
-            if (!"*".equals(parts[1])) {
-                port = Integer.parseInt(parts[1]);
-                PortRange.validatePort(port);
-            }
-
-            matcher = new SimpleSocketMatcher(ip, port);
-        } catch (Exception e) {
-            throw new ParseException("Invalid rule.", 0);
-        }
-
-        pos += RULE_SOCKET_SEPARATOR.length();
-        String left = rule.substring(pos);
-        int bindPos = left.indexOf(RULE_BIND_SEPARATOR);
-        if (bindPos == -1) {
-            try {
-                remoteAddress = new SocketAddress(left.trim());
-            } catch (Exception e) {
-                throw new ParseException(e.getMessage(), pos);
-            }
-        } else {
-            String remoteAddrStr = left.substring(0, bindPos);
-            bindPos += RULE_BIND_SEPARATOR.length();
-
-            try {
-                remoteAddress = new SocketAddress(remoteAddrStr.trim());
-            } catch (Exception e) {
-                throw new ParseException(e.getMessage(), pos);
-            }
-
-            try {
-                String bindRuleStr = left.substring(bindPos).trim();
-                bindAddressProvider = SocketAddressProviderBase.create(bindRuleStr);
-            } catch (Exception e) {
-                throw new ParseException(e.getMessage(), bindPos);
-            }
-        }
-
-        rules.add(new Rule(matcher, remoteAddress, bindAddressProvider));
+        rules.add(parseRule(rule));
     }
 
     public void put(SessionInfo session, SocketAddress remoteAddress) {
@@ -137,62 +78,72 @@
 
     public static Rule parseRule(String rule) throws ParseException {
         Assert.notEmpty(rule, "rule");
-        int pos = rule.indexOf(RULE_SOCKET_SEPARATOR);
-        if (pos == -1) {
-            throw new ParseException("Invalid rule", 0);
-        }
-
-        String orgAddrStr = rule.substring(0, pos).trim();
-        SocketMatcher matcher;
-        SocketAddress remoteAddress;
         SocketAddressProvider bindAddressProvider = SocketAddressProviderBase.ANY_SOCKET_PROVIDER;
+        SocketAddress remoteAddress = null;
+        SocketMatcher matcher;
 
-        try {
-            String[] parts = StringUtils.split(orgAddrStr, ":");
-            if (parts.length != 2) {
-                throw new Exception();
-            }
+        int pos = rule.indexOf(RULE_SOCKET_SEPARATOR);
+        if (pos != -1) {
+            String orgAddrStr = rule.substring(0, pos).trim();
+            try {
+                String[] parts = StringUtils.split(orgAddrStr, ":");
+                if (parts.length != 2) {
+                    throw new Exception();
+                }
 
-            IpAddress ip = null;
-            if (!"*".equals(parts[0])) {
-                ip = IpAddress.parse(parts[0]);
+                IpAddress ip = null;
+                if (!"*".equals(parts[0])) {
+                    ip = IpAddress.parse(parts[0]);
+                }
+
+                int port = -1;
+                if (!"*".equals(parts[1])) {
+                    port = Integer.parseInt(parts[1]);
+                    PortRange.validatePort(port);
+                }
+
+                matcher = new SimpleSocketMatcher(ip, port);
+            } catch (Exception e) {
+                throw new ParseException("Invalid rule.", 0);
             }
 
-            int port = -1;
-            if (!"*".equals(parts[1])) {
-                port = Integer.parseInt(parts[1]);
-                PortRange.validatePort(port);
-            }
+            pos += RULE_SOCKET_SEPARATOR.length();
+            String left = rule.substring(pos);
+            int bindPos = left.indexOf(RULE_BIND_SEPARATOR);
+            if (bindPos == -1) {
+                try {
+                    remoteAddress = new SocketAddress(left.trim());
+                } catch (Exception e) {
+                    throw new ParseException(e.getMessage(), pos);
+                }
+            } else {
+                String remoteAddrStr = left.substring(0, bindPos);
+                bindPos += RULE_BIND_SEPARATOR.length();
 
-            matcher = new SimpleSocketMatcher(ip, port);
-        } catch (Exception e) {
-            throw new ParseException("Invalid rule.", 0);
-        }
+                try {
+                    remoteAddress = new SocketAddress(remoteAddrStr.trim());
+                } catch (Exception e) {
+                    throw new ParseException(e.getMessage(), pos);
+                }
 
-        pos += RULE_SOCKET_SEPARATOR.length();
-        String left = rule.substring(pos);
-        int bindPos = left.indexOf(RULE_BIND_SEPARATOR);
-        if (bindPos == -1) {
-            try {
-                remoteAddress = new SocketAddress(left.trim());
-            } catch (Exception e) {
-                throw new ParseException(e.getMessage(), pos);
+                try {
+                    String bindRuleStr = left.substring(bindPos).trim();
+                    bindAddressProvider = SocketAddressProviderBase.create(bindRuleStr);
+                } catch (Exception e) {
+                    throw new ParseException(e.getMessage(), bindPos);
+                }
             }
         } else {
-            String remoteAddrStr = left.substring(0, bindPos);
-            bindPos += RULE_BIND_SEPARATOR.length();
-
-            try {
-                remoteAddress = new SocketAddress(remoteAddrStr.trim());
-            } catch (Exception e) {
-                throw new ParseException(e.getMessage(), pos);
+            matcher = SimpleSocketMatcher.ALL_MATCH;
+            if (!rule.startsWith(RULE_BIND_SEPARATOR)) {
+                throw new ParseException("Invalid rule.", 0);
             }
 
             try {
-                String bindRuleStr = left.substring(bindPos).trim();
+                String bindRuleStr = rule.substring(RULE_BIND_SEPARATOR.length()).trim();
                 bindAddressProvider = SocketAddressProviderBase.create(bindRuleStr);
             } catch (Exception e) {
-                throw new ParseException(e.getMessage(), bindPos);
+                throw new ParseException(e.getMessage(), RULE_BIND_SEPARATOR.length());
             }
         }
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionMapper.java	Mon Aug 21 17:13:41 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionMapper.java	Tue Aug 22 15:03:36 2017 +0200
@@ -22,12 +22,15 @@
 
         private final SocketAddress bindAddress;
 
+        public ConnectionParams() {
+            this(null);
+        }
+
         public ConnectionParams(SocketAddress remoteAddress) {
             this(remoteAddress, ANY_SOCKET);
         }
 
         public ConnectionParams(SocketAddress remoteAddress, SocketAddress bindAddress) {
-            Assert.notNull(remoteAddress, "remoteAddress");
             Assert.notNull(bindAddress, "bindAddress");
             this.remoteAddress = remoteAddress;
             this.bindAddress = bindAddress;
--- a/stress-tester/src/main/java/com/passus/st/emitter/SimpleSocketMatcher.java	Mon Aug 21 17:13:41 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/SimpleSocketMatcher.java	Tue Aug 22 15:03:36 2017 +0200
@@ -7,7 +7,9 @@
  * @author mikolaj.podbielski
  */
 public class SimpleSocketMatcher implements SocketMatcher {
-    
+
+    public static final SimpleSocketMatcher ALL_MATCH = new SimpleSocketMatcher(null, -1);
+
     final IpAddress ip;
     final int port;
 
@@ -26,5 +28,5 @@
         }
         return true;
     }
-    
+
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java	Mon Aug 21 17:13:41 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java	Tue Aug 22 15:03:36 2017 +0200
@@ -53,6 +53,16 @@
     }
 
     @Override
+    public boolean isConnected() {
+        return channel.isConnected();
+    }
+
+    @Override
+    public boolean isConnectionPending() {
+        return channel.isConnectionPending();
+    }
+
+    @Override
     public void write(byte[] data, int offset, int length) throws IOException {
         writeToPipe(ByteBuffer.wrap(data, offset, length));
     }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Mon Aug 21 17:13:41 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Tue Aug 22 15:03:36 2017 +0200
@@ -175,8 +175,12 @@
             }
 
             Pipe pipe = Pipe.open();
+            SocketAddress remoteAddress = connParams.getRemoteAddress();
+            if (remoteAddress == null) {
+                remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
+            }
 
-            NioChannelContext channelContext = new NioChannelContext(this, channel, connParams.getRemoteAddress(), sessionInfo, pipe);
+            NioChannelContext channelContext = new NioChannelContext(this, channel, remoteAddress, sessionInfo, pipe);
             KeyContext keyContext = new KeyContext(channelContext, handler);
             SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext);
             try {
@@ -187,7 +191,7 @@
 
             channelContext.selectionKey(key);
             try {
-                channel.connect(AddressUtils.socketAddressToJdkSocket(connParams.getRemoteAddress()));
+                channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress));
             } catch (Exception ex) {
                 doCatchException(key, ex);
                 return;
--- a/stress-tester/src/test/java/com/passus/st/emitter/RuleBasedSessionMapperTest.java	Mon Aug 21 17:13:41 2017 +0200
+++ b/stress-tester/src/test/java/com/passus/st/emitter/RuleBasedSessionMapperTest.java	Tue Aug 22 15:03:36 2017 +0200
@@ -49,6 +49,7 @@
         assertMatchRule(" 1.1.1.1:80->2.2.2.2:90 ", new SocketAddress("2.2.2.2:90"), SessionMapper.ANY_SOCKET);
 
         assertMatchRule("1.1.1.1:80->2.2.2.2:90 bind 172.16.40.10", new SocketAddress("2.2.2.2:90"), new SocketAddress("172.16.40.10", 0));
+        assertMatchRule("bind 172.16.40.10", null, new SocketAddress("172.16.40.10", 0));
         assertMatchRule("1.1.1.1:80->2.2.2.2:90 bind 172.16.40.10:*", new SocketAddress("2.2.2.2:90"), new SocketAddress("172.16.40.10", 1));
         assertMatchRule("1.1.1.1:80->2.2.2.2:90 bind 172.16.40.10:2001-6000", new SocketAddress("2.2.2.2:90"), new SocketAddress("172.16.40.10", 2001));
         assertMatchRule("1.1.1.1:80->2.2.2.2:90 bind 172.16.40.10/31:2001-6000", new SocketAddress("2.2.2.2:90"), new SocketAddress("172.16.40.10", 2001));
@@ -63,11 +64,13 @@
     @Test
     public void testBind() throws Exception {
         List<SocketAddress> expected1 = Arrays.asList(sa("172.16.40.8", 0), sa("172.16.40.9", 0), sa("172.16.40.8", 0));
+        testBind("bind 172.16.40.8/31", expected1);
         testBind("*:* -> 2.2.2.2:90 bind 172.16.40.8/31", expected1);
         testBind("*:* -> 2.2.2.2:90 bind 172.16.40.8,172.16.40.9", expected1);
         testBind("*:* -> 2.2.2.2:90 bind 172.16.40.8-172.16.40.9", expected1);
 
         List<SocketAddress> expected2 = Arrays.asList(sa("172.16.40.8", 1001), sa("172.16.40.9", 1001), sa("172.16.40.8", 1002));
+        testBind("bind 172.16.40.8/31:1001-2000", expected2);
         testBind("*:* -> 2.2.2.2:90 bind 172.16.40.8/31:1001-2000", expected2);
         testBind("*:* -> 2.2.2.2:90 bind 172.16.40.8,172.16.40.9:1001-2000", expected2);
         testBind("*:* -> 2.2.2.2:90 bind 172.16.40.8-172.16.40.9:1001-2000", expected2);