changeset 972:6fc989064ecf

PcapSessionEventSource - Netflow broadcast in progress
author Devel 2
date Tue, 23 Jul 2019 10:44:29 +0200
parents b429501707ca
children 641a1a8bcb12
files stress-tester/src/main/java/com/passus/st/client/FlowContext.java stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandler.java stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandlerDataEncoder.java stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java stress-tester/src/main/java/com/passus/st/source/PcapNetflowSessionAnalyzerHook.java stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java stress-tester/src/test/java/com/passus/st/utils/EventUtils.java
diffstat 13 files changed, 175 insertions(+), 11 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Mon Jul 22 15:46:58 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java	Tue Jul 23 10:44:29 2019 +0200
@@ -43,6 +43,8 @@
 
     private FlowHandler client;
 
+    private boolean bidirectional = true;
+
     @Deprecated
     protected DataDecoder decoder;
 
@@ -52,6 +54,14 @@
         this.session = session;
     }
 
+    public boolean isBidirectional() {
+        return bidirectional;
+    }
+
+    public void setBidirectional(boolean bidirectional) {
+        this.bidirectional = bidirectional;
+    }
+
     public ChannelContext channelContext() {
         return channelContext;
     }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java	Mon Jul 22 15:46:58 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java	Tue Jul 23 10:44:29 2019 +0200
@@ -2,9 +2,9 @@
 
 import com.passus.st.client.dns.DnsFlowHandler;
 import com.passus.st.client.http.HttpFlowHandler;
+import com.passus.st.client.netflow.NetflowFlowHandler;
 
-import static com.passus.st.Protocols.DNS;
-import static com.passus.st.Protocols.HTTP;
+import static com.passus.st.Protocols.*;
 
 public class FlowHandlerFactoryImpl implements FlowHandlerFactory {
 
@@ -15,6 +15,8 @@
                 return new HttpFlowHandler();
             case DNS:
                 return new DnsFlowHandler();
+            case NETFLOW:
+                return new NetflowFlowHandler();
 
         }
 
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java	Mon Jul 22 15:46:58 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java	Tue Jul 23 10:44:29 2019 +0200
@@ -352,6 +352,7 @@
                             context.getRemoteAddress());
                 }
 
+                context.setBidirectional(flowContext.isBidirectional());
                 flowContext.channelContext(context);
                 changeFlowState(flowContext, STATE_CONNECTED);
             }
@@ -453,14 +454,13 @@
             }
 
             lock.notifyAll();
-
         }
     }
 
     @Override
     public void errorOccurred(ChannelContext context, Throwable cause) throws Exception {
         if (logger.isDebugEnabled()) {
-            logger.debug("Error occured. " + cause.getMessage(), cause);
+            logger.debug("Error occurred. " + cause.getMessage(), cause);
         }
 
         synchronized (lock) {
@@ -498,6 +498,9 @@
                     flowContext.sentEvent(event);
                     flowContext.channelContext().writeAndFlush(buffer);
                     buffer.clear();
+
+
+
                     return true;
                 } catch (Exception e) {
                     if (logger.isDebugEnabled()) {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandler.java	Tue Jul 23 10:44:29 2019 +0200
@@ -0,0 +1,73 @@
+package com.passus.st.client.netflow;
+
+import com.passus.commons.Assert;
+import com.passus.commons.time.TimeAware;
+import com.passus.commons.time.TimeGenerator;
+import com.passus.st.client.FlowContext;
+import com.passus.st.client.FlowHandler;
+import com.passus.st.client.FlowHandlerDataDecoder;
+import com.passus.st.client.FlowHandlerDataEncoder;
+import com.passus.st.metric.MetricsContainer;
+
+import static com.passus.st.Protocols.NETFLOW;
+
+public class NetflowFlowHandler implements FlowHandler, TimeAware {
+
+    private final NetflowFlowHandlerDataEncoder encoder;
+
+    TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator();
+
+    boolean collectMetrics = false;
+
+    public NetflowFlowHandler() {
+        this.encoder = new NetflowFlowHandlerDataEncoder();
+    }
+
+    @Override
+    public int getProtocolId() {
+        return NETFLOW;
+    }
+
+    @Override
+    public boolean isCollectMetrics() {
+        return collectMetrics;
+    }
+
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+        this.collectMetrics = collectMetrics;
+    }
+
+    @Override
+    public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) {
+        return null;
+    }
+
+    @Override
+    public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) {
+        return encoder;
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+
+    }
+
+    @Override
+    public TimeGenerator getTimeGenerator() {
+        return timeGenerator;
+    }
+
+    @Override
+    public void setTimeGenerator(TimeGenerator timeGenerator) {
+        Assert.notNull(timeGenerator, "timeGenerator");
+        this.timeGenerator = timeGenerator;
+    }
+
+    @Override
+    public void init(FlowContext flowContext) {
+        flowContext.setBidirectional(false);
+    }
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandlerDataEncoder.java	Tue Jul 23 10:44:29 2019 +0200
@@ -0,0 +1,22 @@
+package com.passus.st.client.netflow;
+
+import com.passus.data.ByteBuff;
+import com.passus.net.netflow.*;
+import com.passus.st.client.FlowContext;
+import com.passus.st.client.FlowHandlerDataEncoder;
+
+public class NetflowFlowHandlerDataEncoder implements FlowHandlerDataEncoder<Netflow> {
+
+    private final Netflow5Encoder netflow5Encoder = new Netflow5Encoder();
+
+    private final Netflow9Encoder netflow9Encoder = new Netflow9Encoder();
+
+    @Override
+    public void encode(Netflow request, FlowContext flowContext, ByteBuff out) {
+        if (request.getVersion() == NetflowUtils.VERSION_5) {
+            netflow5Encoder.encode((Netflow5) request, out);
+        } else if (request.getVersion() == NetflowUtils.VERSION_9) {
+            netflow9Encoder.encode((Netflow9) request, out);
+        }
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java	Mon Jul 22 15:46:58 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java	Tue Jul 23 10:44:29 2019 +0200
@@ -10,6 +10,10 @@
  */
 public interface ChannelContext {
 
+    boolean isBidirectional();
+
+    void setBidirectional(boolean bidirectional);
+
     boolean isConnected();
 
     boolean isConnectionPending();
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Mon Jul 22 15:46:58 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Tue Jul 23 10:44:29 2019 +0200
@@ -62,7 +62,6 @@
         this(srcIp, srcPort, dstIp, dstPort, DEFAULT_TRANSPORT);
     }
 
-
     public SessionInfo(IpAddress srcIp, int srcPort, IpAddress dstIp, int dstPort, int transport) {
         this(srcIp, srcPort, dstIp, dstPort, transport, UNKNOWN);
     }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Mon Jul 22 15:46:58 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Tue Jul 23 10:44:29 2019 +0200
@@ -305,7 +305,10 @@
             logger.debug(e.getMessage(), e);
         }
 
-        setOpRead(key);
+        if (channelContext.isBidirectional()) {
+            setOpRead(key);
+        }
+
         clearOpWrite(key);
     }
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java	Mon Jul 22 15:46:58 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java	Tue Jul 23 10:44:29 2019 +0200
@@ -19,6 +19,8 @@
 
     protected final T channel;
 
+    protected boolean bidirectional = true;
+
     protected final Queue<ByteBuffer> dataQueue;
 
     protected SocketAddress localAddress;
@@ -35,6 +37,16 @@
         this.dataQueue = new LinkedList<>();
     }
 
+    @Override
+    public boolean isBidirectional() {
+        return bidirectional;
+    }
+
+    @Override
+    public void setBidirectional(boolean bidirectional) {
+        this.bidirectional = bidirectional;
+    }
+
     public void selectionKey(SelectionKey key) {
         this.key = key;
     }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java	Mon Jul 22 15:46:58 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java	Tue Jul 23 10:44:29 2019 +0200
@@ -5,6 +5,7 @@
 import com.passus.net.utils.AddressUtils;
 import com.passus.st.emitter.ChannelContext;
 import com.passus.st.emitter.SessionInfo;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
@@ -13,7 +14,6 @@
 import java.util.Queue;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public class NioChannelContext2 implements ChannelContext {
@@ -24,6 +24,8 @@
 
     private final SocketChannel channel;
 
+    protected boolean bidirectional = true;
+
     private final Queue<ByteBuffer> dataQueue;
 
     private SocketAddress localAddress;
@@ -36,7 +38,7 @@
      * Usunac
      */
     public long regTime;
-    
+
     public NioChannelContext2(NioEmitterWorker2 worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) {
         this.worker = worker;
         this.channel = channel;
@@ -46,6 +48,16 @@
 
     }
 
+    @Override
+    public boolean isBidirectional() {
+        return bidirectional;
+    }
+
+    @Override
+    public void setBidirectional(boolean bidirectional) {
+        this.bidirectional = bidirectional;
+    }
+
     Queue<ByteBuffer> dataQueue() {
         return dataQueue;
     }
--- a/stress-tester/src/main/java/com/passus/st/source/PcapNetflowSessionAnalyzerHook.java	Mon Jul 22 15:46:58 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapNetflowSessionAnalyzerHook.java	Tue Jul 23 10:44:29 2019 +0200
@@ -4,6 +4,7 @@
 import com.passus.net.session.SessionAnalyzer;
 
 import static com.passus.st.Protocols.DNS;
+import static com.passus.st.Protocols.NETFLOW;
 
 public class PcapNetflowSessionAnalyzerHook extends PcapSessionAnalyzerHook {
 
@@ -17,7 +18,7 @@
         PcapUnidirectionalAnalyzerListener listener = new PcapUnidirectionalAnalyzerListener(context.getSourceName(),
                 context.getEventHandler(),
                 context.isCollectMetric(),
-                context.getMetric(), DNS);
+                context.getMetric(), NETFLOW);
         analyzer.setListener(listener);
 
         context.getUdpProcessor().addAnalyzer(analyzer);
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Mon Jul 22 15:46:58 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java	Tue Jul 23 10:44:29 2019 +0200
@@ -129,6 +129,8 @@
 
         private SocketAddress remoteAddress;
 
+        protected boolean bidirectional = true;
+
         public LocalChannelContext(LocalEmitter emitter, EmitterHandler handler, SocketAddress remoteAddress, SessionInfo sessionInfo) {
             this.emitter = emitter;
             this.handler = handler;
@@ -138,6 +140,16 @@
         }
 
         @Override
+        public boolean isBidirectional() {
+            return bidirectional;
+        }
+
+        @Override
+        public void setBidirectional(boolean unidirectional) {
+            this.bidirectional = unidirectional;
+        }
+
+        @Override
         public boolean isConnected() {
             throw new UnsupportedOperationException("Not supported yet.");
         }
--- a/stress-tester/src/test/java/com/passus/st/utils/EventUtils.java	Mon Jul 22 15:46:58 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/utils/EventUtils.java	Tue Jul 23 10:44:29 2019 +0200
@@ -5,6 +5,8 @@
 import com.passus.net.PortRangeSet;
 import com.passus.net.dns.session.DnsUdpSessionAnalyzer;
 import com.passus.net.http.session.HttpSessionAnalyzer;
+import com.passus.net.netflow.NetflowUdpSessionAnalyzer;
+import com.passus.net.session.SessionAnalyzer;
 import com.passus.st.Protocols;
 import com.passus.st.client.ArrayListEventHandler;
 import com.passus.st.client.Event;
@@ -51,8 +53,9 @@
             src.setAllowPartialSession(allowPartialSession);
             src.setPcapFile(pcapFile.getAbsolutePath());
 
+            SessionAnalyzer analyzer;
             if (protocols.contains(Protocols.HTTP)) {
-                HttpSessionAnalyzer analyzer = new HttpSessionAnalyzer();
+                analyzer = new HttpSessionAnalyzer();
                 if (portsRange != null) {
                     analyzer.getPortsRange().addAll(portsRange);
                 }
@@ -60,7 +63,15 @@
             }
 
             if (protocols.contains(Protocols.DNS)) {
-                DnsUdpSessionAnalyzer analyzer = new DnsUdpSessionAnalyzer();
+                analyzer = new DnsUdpSessionAnalyzer();
+                if (portsRange != null) {
+                    analyzer.getPortsRange().addAll(portsRange);
+                }
+                src.addAnalyzer(analyzer);
+            }
+
+            if (protocols.contains(Protocols.NETFLOW)) {
+                analyzer = new NetflowUdpSessionAnalyzer();
                 if (portsRange != null) {
                     analyzer.getPortsRange().addAll(portsRange);
                 }