changeset 605:5c0b5bfd2972

HttpDebugFilter
author Devel 1
date Mon, 09 Oct 2017 09:07:31 +0200
parents 4cbc75e90f7e
children 2e185909d22c
files stress-tester/src/main/java/com/passus/st/client/http/HttpMessageWriter.java stress-tester/src/main/java/com/passus/st/client/http/filter/HttpDebugFilter.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java
diffstat 3 files changed, 176 insertions(+), 114 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpMessageWriter.java	Fri Oct 06 10:42:04 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpMessageWriter.java	Mon Oct 09 09:07:31 2017 +0200
@@ -49,6 +49,9 @@
 
     public void writeSilently(HttpMessage message, File file) {
         try {
+            if (file.exists()) {
+                LOGGER.warn("File '{}' already exists.", file.getAbsolutePath());
+            }
             write(message, file);
         } catch (IOException ex) {
             LOGGER.debug("Could not save file '{}'.", file.getAbsolutePath());
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpDebugFilter.java	Mon Oct 09 09:07:31 2017 +0200
@@ -0,0 +1,59 @@
+package com.passus.st.client.http.filter;
+
+import com.passus.commons.annotations.Plugin;
+import com.passus.config.annotations.NodeDefinitionCreate;
+import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef;
+import com.passus.config.schema.NodeDefinition;
+import com.passus.config.schema.NodeDefinitionCreator;
+import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpResponse;
+import com.passus.st.client.http.HttpFlowContext;
+import com.passus.st.plugin.PluginConstants;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+@Plugin(name = HttpDebugFilter.TYPE, category = PluginConstants.CATEGORY_HTTP_FILTER)
+@NodeDefinitionCreate(HttpDebugFilter.EmptyDefCreator.class)
+public class HttpDebugFilter extends HttpFilter {
+
+    private static final Logger LOGGER = LogManager.getLogger(HttpDebugFilter.class);
+
+    public static final String TYPE = "debug";
+
+    @Override
+    public int filterOutbound(HttpRequest req, HttpResponse resp, HttpFlowContext context) {
+        LOGGER.warn("OUT {} {} @{}", info(req), info(resp), System.identityHashCode(context));
+        return DUNNO;
+    }
+
+    @Override
+    public int filterInbound(HttpRequest req, HttpResponse resp, HttpFlowContext context) {
+        LOGGER.warn("IN {} {} @{}", info(req), info(resp), System.identityHashCode(context));
+        return DUNNO;
+    }
+
+    @Override
+    public HttpDebugFilter instanceForWorker(int index) {
+        return new HttpDebugFilter();
+    }
+
+    public static String info(HttpRequest req) {
+        return String.format("[%s @%d %s]", req.getId(), System.identityHashCode(req), req.getUrl().toString());
+    }
+
+    private Object info(HttpResponse resp) {
+        return String.format("[%d @%s]", resp.getStatus().getCode(), System.identityHashCode(resp));
+    }
+
+    public static class EmptyDefCreator implements NodeDefinitionCreator {
+
+        @Override
+        public NodeDefinition create() {
+            return mapDef();
+        }
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Fri Oct 06 10:42:04 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java	Mon Oct 09 09:07:31 2017 +0200
@@ -31,61 +31,61 @@
  * @author Mirosław Hawrot
  */
 public class NioEmitterWorker extends Thread implements MetricSource {
-    
+
     private static final Logger LOGGER = LogManager.getLogger(NioEmitterWorker.class);
-    
+
     private int index;
-    
+
     private final Selector selector;
-    
+
     private volatile boolean working = false;
-    
+
     private long selectTimeout = 100;
-    
+
     private Queue<Task> tasks = new ConcurrentLinkedQueue<>();
-    
+
     private SessionMapper sessionMapper;
-    
+
     private volatile boolean collectMetrics = false;
-    
+
     private long connectionTimeout = 5_000;
-    
+
     private EmitterMetric metric;
-    
+
     NioEmitterWorker(int index) throws IOException {
         super("NioEmitterWorker-" + index);
         selector = Selector.open();
     }
-    
+
     int getIndex() {
         return index;
     }
-    
+
     public long getSelectTimeout() {
         return selectTimeout;
     }
-    
+
     public void setSelectTimeout(long selectTimeout) {
         Assert.greaterThanZero(selectTimeout, "selectTimeout");
         this.selectTimeout = selectTimeout;
     }
-    
+
     public long getConnectionTimeout() {
         return connectionTimeout;
     }
-    
+
     public void setConnectionTimeout(long connectionTimeout) {
         this.connectionTimeout = connectionTimeout;
     }
-    
+
     public SessionMapper getSessionMapper() {
         return sessionMapper;
     }
-    
+
     public void setSessionMapper(SessionMapper sessionMapper) {
         this.sessionMapper = sessionMapper;
     }
-    
+
     public void setWorking(boolean working) {
         this.working = working;
         if (this.working && !working) {
@@ -96,12 +96,12 @@
             }
         }
     }
-    
+
     @Override
     public boolean isCollectMetrics() {
         return collectMetrics;
     }
-    
+
     @Override
     public void setCollectMetrics(boolean collectMetrics) {
         if (collectMetrics && metric == null) {
@@ -123,15 +123,15 @@
                     LOGGER.debug("Error occured during metric deactivation. " + e.getMessage(), e);
                 }
             }
-            
+
             metric = null;
         }
     }
-    
+
     public boolean isWorking() {
         return working;
     }
-    
+
     @Override
     public void writeMetrics(MetricsContainer container) {
         if (collectMetrics) {
@@ -139,17 +139,17 @@
             metric.reset();
         }
     }
-    
+
     public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException {
         tasks.add(new ConnectTask(sessionInfo, handler));
         selector.wakeup();
     }
-    
+
     void flush(SelectionKey key) {
         tasks.add(new FlushTask(key));
         selector.wakeup();
     }
-    
+
     private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) {
         try {
             ConnectionParams connParams = sessionMapper.map(sessionInfo);
@@ -157,39 +157,39 @@
                 if (LOGGER.isDebugEnabled()) {
                     LOGGER.debug("Unable to map session '" + sessionInfo + "'.");
                 }
-                
+
                 if (collectMetrics) {
                     synchronized (metric) {
                         metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID);
                     }
                 }
-                
+
                 try {
                     handler.sessionInvalidated(sessionInfo);
                 } catch (Exception e) {
                     LOGGER.debug(e.getMessage(), e);
                 }
-                
+
                 return;
             }
-            
+
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug("Registering session '" + sessionInfo + "'. Mapped connection parameters '" + connParams + "'.");
             }
-            
+
             SocketChannel channel = SocketChannel.open();
             channel.configureBlocking(false);
-            
+
             SocketAddress bindAddress = connParams.getBindAddress();
             if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
                 channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress));
             }
-            
+
             SocketAddress remoteAddress = connParams.getRemoteAddress();
             if (remoteAddress == null) {
                 remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
             }
-            
+
             NioChannelContext channelContext = new NioChannelContext(this, channel, remoteAddress, sessionInfo);
             KeyContext keyContext = new KeyContext(channelContext, handler);
             SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext);
@@ -198,7 +198,7 @@
             } catch (Exception ex) {
                 doCatchException(key, ex);
             }
-            
+
             channelContext.selectionKey(key);
             try {
                 channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress));
@@ -206,7 +206,7 @@
                 doCatchException(key, ex);
                 return;
             }
-            
+
             selector.wakeup();
         } catch (Exception e) {
             if (collectMetrics) {
@@ -214,17 +214,17 @@
             }
             LOGGER.error(e.getMessage(), e);
         }
-        
+
     }
-    
+
     private void doFinishConnect(SelectionKey key) {
         SocketChannel channel = (SocketChannel) key.channel();
         KeyContext keyContext = (KeyContext) key.attachment();
-        
+
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Connecting to '" + keyContext.channelContext.getRemoteAddress() + "'.");
         }
-        
+
         try {
             long connStart = System.currentTimeMillis();
             boolean timeouted = false;
@@ -236,14 +236,14 @@
                 }
             }
             key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
-            
+
             if (timeouted) {
                 if (LOGGER.isDebugEnabled()) {
                     LOGGER.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress());
                 }
-                
+
                 throw new ConnectException("Connection timed out.");
-                
+
             }
         } catch (Exception e) {
             doCatchException(key, e);
@@ -251,27 +251,27 @@
             if (collectMetrics) {
                 metric.incConnectionsErrors();
             }
-            
+
             return;
         }
-        
+
         try {
             if (collectMetrics) {
                 metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress());
                 metric.addBindSocket(keyContext.channelContext.getLocalAddress());
             }
-            
+
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress());
             }
-            
+
             keyContext.handler.channelActive(keyContext.channelContext);
             setOpRead(key);
         } catch (Exception ex) {
             LOGGER.error(ex.getMessage(), ex);
         }
     }
-    
+
     private void doWrite(SelectionKey key) {
         SocketChannel socketChannel = (SocketChannel) key.channel();
         KeyContext keyContext = (KeyContext) key.attachment();
@@ -279,7 +279,7 @@
             LOGGER.debug("Writing ({} -> {}).",
                     keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
         }
-        
+
         keyContext.handler.dataWriteStart(keyContext.channelContext);
         Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue();
         int written = 0;
@@ -289,16 +289,16 @@
                 buffer = queue.poll();
                 while (buffer.hasRemaining()) {
                     int res = socketChannel.write(buffer);
-                    
+
                     if (res == -1) {
                         doClose(key);
                         return;
                     }
-                    
+
                     if (collectMetrics) {
                         metric.updateSentBytes(res);
                     }
-                    
+
                     written += res;
                 }
             }
@@ -307,7 +307,7 @@
             doClose(key);
             return;
         }
-        
+
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Written {}B ({} -> {})", written,
                     keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
@@ -320,24 +320,24 @@
         } catch (Exception e) {
             LOGGER.debug(e.getMessage(), e);
         }
-        
+
         setOpRead(key);
         clearOpWrite(key);
     }
-    
+
     private void doRead(SelectionKey key) {
         SocketChannel channel = (SocketChannel) key.channel();
         KeyContext keyContext = (KeyContext) key.attachment();
-        
+
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Reading ({} -> {})",
                     keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
         }
-        
+
         ByteBuffer buffer = keyContext.buffer;
-        
+
         buffer.clear();
-        
+
         ByteBuff buff = new HeapByteBuff();
         int totalReaded = 0;
         int readed;
@@ -347,7 +347,7 @@
                 buff.append(buffer.array(), buffer.position(), buffer.limit());
                 buffer.clear();
                 totalReaded += readed;
-                
+
                 if (collectMetrics) {
                     metric.updateReceivedBytes(readed);
                 }
@@ -357,12 +357,12 @@
             doClose(key);
             return;
         }
-        
+
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Readed {}B ({} -> {})", totalReaded,
                     keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
         }
-        
+
         if (totalReaded > 0) {
             try {
                 keyContext.handler.dataReceived(keyContext.channelContext, buff);
@@ -371,117 +371,117 @@
                 LOGGER.debug(e.getMessage(), e);
             }
         }
-        
+
         if (readed == -1) {
             doClose(key);
             return;
         }
-        
+
         keyContext.buffer.flip();
     }
-    
+
     private void doCatchException(SelectionKey key, Throwable cause) {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Error occured. " + cause.getMessage(), cause);
         }
-        
+
         if (collectMetrics) {
             synchronized (metric) {
                 metric.errorCaught(cause);
             }
         }
-        
+
         KeyContext keyContext = (KeyContext) key.attachment();
-        
+
         try {
             keyContext.handler.errorOccured(keyContext.channelContext, cause);
         } catch (Exception e) {
             LOGGER.debug(e.getMessage(), e);
         }
     }
-    
+
     private void doClose(SelectionKey key) {
         if (!key.channel().isOpen()) {
             selector.wakeup();
             return;
         }
-        
+
         try {
             key.channel().close();
         } catch (Exception e) {
             LOGGER.debug(e.getMessage(), e);
         }
-        
+
         KeyContext keyContext = (KeyContext) key.attachment();
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'.");
         }
-        
+
         try {
             keyContext.handler.channelInactive(keyContext.channelContext);
         } catch (Exception e) {
             LOGGER.debug(e.getMessage(), e);
         }
-        
+
         key.cancel();
         try {
             keyContext.handler.channelUnregistered(keyContext.channelContext);
         } catch (Exception e) {
             LOGGER.debug(e.getMessage(), e);
         }
-        
+
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'.");
         }
-        
+
         if (collectMetrics) {
             metric.incClosedConnections();
         }
-        
+
         selector.wakeup();
     }
-    
+
     static void setOpRead(SelectionKey key) {
         if (!key.isValid() || key.isReadable()) {
             return;
         }
-        
+
         key.interestOps(key.interestOps() | SelectionKey.OP_READ);
         key.selector().wakeup();
     }
-    
+
     static void clearOpRead(SelectionKey key) {
         if (!key.isValid() || !key.isReadable()) {
             return;
         }
-        
+
         key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
         key.selector().wakeup();
     }
-    
+
     static void setOpWrite(SelectionKey key) {
         if (!key.isValid() || key.isWritable()) {
             return;
         }
-        
+
         key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
         key.selector().wakeup();
     }
-    
+
     static void clearOpWrite(SelectionKey key) {
         if (!key.isValid() || !key.isWritable()) {
             return;
         }
-        
+
         key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
         key.selector().wakeup();
     }
-    
+
     void requestClose(SelectionKey key) {
         tasks.add(new CloseTask(key));
         key.selector().wakeup();
     }
-    
+
     @Override
     public void run() {
         int selected = 0;
@@ -501,23 +501,23 @@
                     }
                 }
             }
-            
+
             try {
                 selected = selector.select(selectTimeout);
             } catch (IOException ex) {
                 LOGGER.warn(ex.getMessage(), ex);
             }
-            
+
             if (selected > 0) {
                 Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                 while (it.hasNext()) {
                     SelectionKey key = it.next();
                     it.remove();
-                    
+
                     if (!key.isValid()) {
                         continue;
                     }
-                    
+
                     if (key.isConnectable()) {
                         doFinishConnect(key);
                     } else if (key.isWritable()) {
@@ -529,68 +529,68 @@
             }
         }
     }
-    
+
     private static class KeyContext {
-        
+
         private final EmitterHandler handler;
-        
+
         private final NioChannelContext channelContext;
-        
+
         private ByteBuffer buffer = ByteBuffer.allocate(1024);
-        
+
         public KeyContext(NioChannelContext channelContext, EmitterHandler handler) {
             this.channelContext = channelContext;
             this.handler = handler;
         }
-        
+
     }
-    
+
     private static abstract class Task {
-        
+
         public static final int CLOSE = 1;
         public static final int CONNECT = 2;
         public static final int FLUSH = 3;
-        
+
         private final int code;
-        
+
         public Task(int code) {
             this.code = code;
         }
-        
+
     }
-    
+
     private final static class ConnectTask extends Task {
-        
+
         private final SessionInfo sessionInfo;
         private final EmitterHandler handler;
-        
+
         public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) {
             super(CONNECT);
             this.sessionInfo = sessionInfo;
             this.handler = handler;
         }
-        
+
     }
-    
+
     private final static class CloseTask extends Task {
-        
+
         private final SelectionKey key;
-        
+
         public CloseTask(SelectionKey key) {
             super(CLOSE);
             this.key = key;
         }
-        
+
     }
-    
+
     private final static class FlushTask extends Task {
-        
+
         private final SelectionKey key;
-        
+
         public FlushTask(SelectionKey key) {
             super(FLUSH);
             this.key = key;
         }
-        
+
     }
 }