changeset 573:1fb90938c654

HttpSynchClientWorker invalid session bugfix
author Devel 2
date Wed, 27 Sep 2017 15:47:49 +0200
parents b9420fb384f2
children f7c2e8285d7c
files stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java stress-tester/src/main/java/com/passus/st/utils/NetExceptionsCategory.java
diffstat 4 files changed, 141 insertions(+), 114 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java	Wed Sep 27 14:03:07 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java	Wed Sep 27 15:47:49 2017 +0200
@@ -8,6 +8,7 @@
 import com.passus.st.client.SessionEvent;
 import com.passus.st.client.SessionStatusEvent;
 import com.passus.st.emitter.Emitter;
+import com.passus.st.emitter.SessionInfo;
 import com.passus.st.plugin.PluginConstants;
 import java.util.concurrent.LinkedBlockingDeque;
 
@@ -50,6 +51,20 @@
     }
 
     @Override
+    public void sessionInvalidated(SessionInfo session) throws Exception {
+        synchronized (lock) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Session {} invalidated.", session);
+            }
+
+            HttpFlowContext flowContext = flowContext(session);
+            if (flowContext != null) {
+                changeFlowState(flowContext, HttpFlowContext.STATE_DISCONNECTING);
+            }
+        }
+    }
+
+    @Override
     protected void flowStateChanged(HttpFlowContext context, int oldState) {
         if (logger.isDebugEnabled()) {
             logger.debug("flowStateChanged {},{}", context == currFlowContext, context.stateString());
--- a/stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java	Wed Sep 27 14:03:07 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java	Wed Sep 27 15:47:49 2017 +0200
@@ -123,6 +123,10 @@
 
     public void errorCaught(Throwable cause) {
         String category = NetExceptionsCategory.errorToCategory(cause);
+        incErrorByCategory(category);
+    }
+
+    public void incErrorByCategory(String category) {
         MutableInt count = errors.get(category);
         if (count == null) {
             count = new MutableInt(1);
@@ -131,7 +135,7 @@
             count.increment();
         }
     }
-
+    
     @Override
     public boolean isActive() {
         return active;
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Wed Sep 27 14:03:07 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Wed Sep 27 15:47:49 2017 +0200
@@ -13,6 +13,7 @@
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.metric.MetricSource;
 import com.passus.st.metric.MetricsContainer;
+import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.nio.ByteBuffer;
@@ -30,61 +31,61 @@
  * @author Mirosław Hawrot
  */
 public class NioEmitterWorker extends Thread implements MetricSource {
-
+    
     private static final Logger LOGGER = LogManager.getLogger(NioEmitterWorker.class);
-
+    
     private int index;
-
+    
     private final Selector selector;
-
+    
     private volatile boolean working = false;
-
+    
     private long selectTimeout = 100;
-
+    
     private Queue<Task> tasks = new ConcurrentLinkedQueue<>();
-
+    
     private SessionMapper sessionMapper;
-
+    
     private volatile boolean collectMetrics = false;
-
+    
     private long connectionTimeout = 5_000;
-
+    
     private EmitterMetric metric;
-
+    
     NioEmitterWorker(int index) throws IOException {
         super("NioEmitterWorker-" + index);
         selector = Selector.open();
     }
-
+    
     int getIndex() {
         return index;
     }
-
+    
     public long getSelectTimeout() {
         return selectTimeout;
     }
-
+    
     public void setSelectTimeout(long selectTimeout) {
         Assert.greaterThanZero(selectTimeout, "selectTimeout");
         this.selectTimeout = selectTimeout;
     }
-
+    
     public long getConnectionTimeout() {
         return connectionTimeout;
     }
-
+    
     public void setConnectionTimeout(long connectionTimeout) {
         this.connectionTimeout = connectionTimeout;
     }
-
+    
     public SessionMapper getSessionMapper() {
         return sessionMapper;
     }
-
+    
     public void setSessionMapper(SessionMapper sessionMapper) {
         this.sessionMapper = sessionMapper;
     }
-
+    
     public void setWorking(boolean working) {
         this.working = working;
         if (this.working && !working) {
@@ -95,12 +96,12 @@
             }
         }
     }
-
+    
     @Override
     public boolean isCollectMetrics() {
         return collectMetrics;
     }
-
+    
     @Override
     public void setCollectMetrics(boolean collectMetrics) {
         if (collectMetrics && metric == null) {
@@ -122,15 +123,15 @@
                     LOGGER.debug("Error occured during metric deactivation. " + e.getMessage(), e);
                 }
             }
-
+            
             metric = null;
         }
     }
-
+    
     public boolean isWorking() {
         return working;
     }
-
+    
     @Override
     public void writeMetrics(MetricsContainer container) {
         if (collectMetrics) {
@@ -138,17 +139,17 @@
             metric.reset();
         }
     }
-
+    
     public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException {
         tasks.add(new ConnectTask(sessionInfo, handler));
         selector.wakeup();
     }
-
+    
     void flush(SelectionKey key) {
         tasks.add(new FlushTask(key));
         selector.wakeup();
     }
-
+    
     private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) {
         try {
             ConnectionParams connParams = sessionMapper.map(sessionInfo);
@@ -156,33 +157,39 @@
                 if (LOGGER.isDebugEnabled()) {
                     LOGGER.debug("Unable to map session '" + sessionInfo + "'.");
                 }
-
+                
+                if (collectMetrics) {
+                    synchronized (metric) {
+                        metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID);
+                    }
+                }
+                
                 try {
                     handler.sessionInvalidated(sessionInfo);
                 } catch (Exception e) {
                     LOGGER.debug(e.getMessage(), e);
                 }
-
+                
                 return;
             }
-
+            
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug("Registering session '" + sessionInfo + "'. Mapped connection parameters '" + connParams + "'.");
             }
-
+            
             SocketChannel channel = SocketChannel.open();
             channel.configureBlocking(false);
-
+            
             SocketAddress bindAddress = connParams.getBindAddress();
             if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
                 channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress));
             }
-
+            
             SocketAddress remoteAddress = connParams.getRemoteAddress();
             if (remoteAddress == null) {
                 remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
             }
-
+            
             NioChannelContext channelContext = new NioChannelContext(this, channel, remoteAddress, sessionInfo);
             KeyContext keyContext = new KeyContext(channelContext, handler);
             SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext);
@@ -191,7 +198,7 @@
             } catch (Exception ex) {
                 doCatchException(key, ex);
             }
-
+            
             channelContext.selectionKey(key);
             try {
                 channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress));
@@ -199,7 +206,7 @@
                 doCatchException(key, ex);
                 return;
             }
-
+            
             selector.wakeup();
         } catch (Exception e) {
             if (collectMetrics) {
@@ -207,17 +214,17 @@
             }
             LOGGER.error(e.getMessage(), e);
         }
-
+        
     }
-
+    
     private void doFinishConnect(SelectionKey key) {
         SocketChannel channel = (SocketChannel) key.channel();
         KeyContext keyContext = (KeyContext) key.attachment();
-
+        
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Connecting to '" + keyContext.channelContext.getRemoteAddress() + "'.");
         }
-
+        
         try {
             long connStart = System.currentTimeMillis();
             boolean timeouted = false;
@@ -229,14 +236,14 @@
                 }
             }
             key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
-
+            
             if (timeouted) {
                 if (LOGGER.isDebugEnabled()) {
                     LOGGER.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress());
                 }
-
+                
                 throw new ConnectException("Connection timed out.");
-
+                
             }
         } catch (Exception e) {
             doCatchException(key, e);
@@ -244,27 +251,27 @@
             if (collectMetrics) {
                 metric.incConnectionsErrors();
             }
-
+            
             return;
         }
-
+        
         try {
             if (collectMetrics) {
                 metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress());
                 metric.addBindSocket(keyContext.channelContext.getLocalAddress());
             }
-
+            
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress());
             }
-
+            
             keyContext.handler.channelActive(keyContext.channelContext);
             setOpRead(key);
         } catch (Exception ex) {
             LOGGER.error(ex.getMessage(), ex);
         }
     }
-
+    
     private void doWrite(SelectionKey key) {
         SocketChannel socketChannel = (SocketChannel) key.channel();
         KeyContext keyContext = (KeyContext) key.attachment();
@@ -272,7 +279,7 @@
             LOGGER.debug("Writing ({} -> {}).",
                     keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
         }
-
+        
         keyContext.handler.dataWriteStart(keyContext.channelContext);
         Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue();
         int written = 0;
@@ -282,16 +289,16 @@
                 buffer = queue.poll();
                 while (buffer.hasRemaining()) {
                     int res = socketChannel.write(buffer);
-
+                    
                     if (res == -1) {
                         doClose(key);
                         return;
                     }
-
+                    
                     if (collectMetrics) {
                         metric.updateSentBytes(res);
                     }
-
+                    
                     written += res;
                 }
             }
@@ -300,7 +307,7 @@
             doClose(key);
             return;
         }
-
+        
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Written {}B ({} -> {})", written,
                     keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
@@ -313,24 +320,24 @@
         } catch (Exception e) {
             LOGGER.debug(e.getMessage(), e);
         }
-
+        
         setOpRead(key);
         clearOpWrite(key);
     }
-
+    
     private void doRead(SelectionKey key) {
         SocketChannel channel = (SocketChannel) key.channel();
         KeyContext keyContext = (KeyContext) key.attachment();
-
+        
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Reading ({} -> {})",
                     keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
         }
-
+        
         ByteBuffer buffer = keyContext.buffer;
-
+        
         buffer.clear();
-
+        
         ByteBuff buff = new HeapByteBuff();
         int totalReaded = 0;
         int readed;
@@ -340,7 +347,7 @@
                 buff.append(buffer.array(), buffer.position(), buffer.limit());
                 buffer.clear();
                 totalReaded += readed;
-
+                
                 if (collectMetrics) {
                     metric.updateReceivedBytes(readed);
                 }
@@ -350,12 +357,12 @@
             doClose(key);
             return;
         }
-
+        
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Readed {}B ({} -> {})", totalReaded,
                     keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
         }
-
+        
         if (totalReaded > 0) {
             try {
                 keyContext.handler.dataReceived(keyContext.channelContext, buff);
@@ -364,117 +371,117 @@
                 LOGGER.debug(e.getMessage(), e);
             }
         }
-
+        
         if (readed == -1) {
             doClose(key);
             return;
         }
-
+        
         keyContext.buffer.flip();
     }
-
+    
     private void doCatchException(SelectionKey key, Throwable cause) {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Error occured. " + cause.getMessage(), cause);
         }
-
+        
         if (collectMetrics) {
             synchronized (metric) {
                 metric.errorCaught(cause);
             }
         }
-
+        
         KeyContext keyContext = (KeyContext) key.attachment();
-
+        
         try {
             keyContext.handler.errorOccured(keyContext.channelContext, cause);
         } catch (Exception e) {
             LOGGER.debug(e.getMessage(), e);
         }
     }
-
+    
     private void doClose(SelectionKey key) {
         if (!key.channel().isOpen()) {
             selector.wakeup();
             return;
         }
-
+        
         try {
             key.channel().close();
         } catch (Exception e) {
             LOGGER.debug(e.getMessage(), e);
         }
-
+        
         KeyContext keyContext = (KeyContext) key.attachment();
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'.");
         }
-
+        
         try {
             keyContext.handler.channelInactive(keyContext.channelContext);
         } catch (Exception e) {
             LOGGER.debug(e.getMessage(), e);
         }
-
+        
         key.cancel();
         try {
             keyContext.handler.channelUnregistered(keyContext.channelContext);
         } catch (Exception e) {
             LOGGER.debug(e.getMessage(), e);
         }
-
+        
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'.");
         }
-
+        
         if (collectMetrics) {
             metric.incClosedConnections();
         }
-
+        
         selector.wakeup();
     }
-
+    
     static void setOpRead(SelectionKey key) {
         if (!key.isValid() || key.isReadable()) {
             return;
         }
-
+        
         key.interestOps(key.interestOps() | SelectionKey.OP_READ);
         key.selector().wakeup();
     }
-
+    
     static void clearOpRead(SelectionKey key) {
         if (!key.isValid() || !key.isReadable()) {
             return;
         }
-
+        
         key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
         key.selector().wakeup();
     }
-
+    
     static void setOpWrite(SelectionKey key) {
         if (!key.isValid() || key.isWritable()) {
             return;
         }
-
+        
         key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
         key.selector().wakeup();
     }
-
+    
     static void clearOpWrite(SelectionKey key) {
         if (!key.isValid() || !key.isWritable()) {
             return;
         }
-
+        
         key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
         key.selector().wakeup();
     }
-
+    
     void requestClose(SelectionKey key) {
         tasks.add(new CloseTask(key));
         key.selector().wakeup();
     }
-
+    
     @Override
     public void run() {
         int selected = 0;
@@ -494,23 +501,23 @@
                     }
                 }
             }
-
+            
             try {
                 selected = selector.select(selectTimeout);
             } catch (IOException ex) {
                 LOGGER.warn(ex.getMessage(), ex);
             }
-
+            
             if (selected > 0) {
                 Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                 while (it.hasNext()) {
                     SelectionKey key = it.next();
                     it.remove();
-
+                    
                     if (!key.isValid()) {
                         continue;
                     }
-
+                    
                     if (key.isConnectable()) {
                         doFinishConnect(key);
                     } else if (key.isWritable()) {
@@ -522,68 +529,68 @@
             }
         }
     }
-
+    
     private static class KeyContext {
-
+        
         private final EmitterHandler handler;
-
+        
         private final NioChannelContext channelContext;
-
+        
         private ByteBuffer buffer = ByteBuffer.allocate(1024);
-
+        
         public KeyContext(NioChannelContext channelContext, EmitterHandler handler) {
             this.channelContext = channelContext;
             this.handler = handler;
         }
-
+        
     }
-
+    
     private static abstract class Task {
-
+        
         public static final int CLOSE = 1;
         public static final int CONNECT = 2;
         public static final int FLUSH = 3;
-
+        
         private final int code;
-
+        
         public Task(int code) {
             this.code = code;
         }
-
+        
     }
-
+    
     private final static class ConnectTask extends Task {
-
+        
         private final SessionInfo sessionInfo;
         private final EmitterHandler handler;
-
+        
         public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) {
             super(CONNECT);
             this.sessionInfo = sessionInfo;
             this.handler = handler;
         }
-
+        
     }
-
+    
     private final static class CloseTask extends Task {
-
+        
         private final SelectionKey key;
-
+        
         public CloseTask(SelectionKey key) {
             super(CLOSE);
             this.key = key;
         }
-
+        
     }
-
+    
     private final static class FlushTask extends Task {
-
+        
         private final SelectionKey key;
-
+        
         public FlushTask(SelectionKey key) {
             super(FLUSH);
             this.key = key;
         }
-
+        
     }
 }
--- a/stress-tester/src/main/java/com/passus/st/utils/NetExceptionsCategory.java	Wed Sep 27 14:03:07 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/utils/NetExceptionsCategory.java	Wed Sep 27 15:47:49 2017 +0200
@@ -16,6 +16,7 @@
     public static final String CONNECTION_NO_ROUTE_TO_HOST = "connection.no_route_to_host";
     public static final String CONNECTION_UNKNWON = "connection.unknwon";
 
+    public static final String BIND_MAPPER_SESSION_INVALID = "bind.mapper_session_invalid";
     public static final String BIND_NETWORK_UNREACHABLE = "bind.network_unreachable";
     public static final String BIND_ADDRESS_ALREADY_IN_USE = "bind.address_already_in_use";
     public static final String BIND_UNKNWON = "bind.unknown";