Mercurial > stress-tester
changeset 972:6fc989064ecf
PcapSessionEventSource - Netflow broadcast in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Mon Jul 22 15:46:58 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Tue Jul 23 10:44:29 2019 +0200 @@ -43,6 +43,8 @@ private FlowHandler client; + private boolean bidirectional = true; + @Deprecated protected DataDecoder decoder; @@ -52,6 +54,14 @@ this.session = session; } + public boolean isBidirectional() { + return bidirectional; + } + + public void setBidirectional(boolean bidirectional) { + this.bidirectional = bidirectional; + } + public ChannelContext channelContext() { return channelContext; }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java Mon Jul 22 15:46:58 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java Tue Jul 23 10:44:29 2019 +0200 @@ -2,9 +2,9 @@ import com.passus.st.client.dns.DnsFlowHandler; import com.passus.st.client.http.HttpFlowHandler; +import com.passus.st.client.netflow.NetflowFlowHandler; -import static com.passus.st.Protocols.DNS; -import static com.passus.st.Protocols.HTTP; +import static com.passus.st.Protocols.*; public class FlowHandlerFactoryImpl implements FlowHandlerFactory { @@ -15,6 +15,8 @@ return new HttpFlowHandler(); case DNS: return new DnsFlowHandler(); + case NETFLOW: + return new NetflowFlowHandler(); }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java Mon Jul 22 15:46:58 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java Tue Jul 23 10:44:29 2019 +0200 @@ -352,6 +352,7 @@ context.getRemoteAddress()); } + context.setBidirectional(flowContext.isBidirectional()); flowContext.channelContext(context); changeFlowState(flowContext, STATE_CONNECTED); } @@ -453,14 +454,13 @@ } lock.notifyAll(); - } } @Override public void errorOccurred(ChannelContext context, Throwable cause) throws Exception { if (logger.isDebugEnabled()) { - logger.debug("Error occured. " + cause.getMessage(), cause); + logger.debug("Error occurred. " + cause.getMessage(), cause); } synchronized (lock) { @@ -498,6 +498,9 @@ flowContext.sentEvent(event); flowContext.channelContext().writeAndFlush(buffer); buffer.clear(); + + + return true; } catch (Exception e) { if (logger.isDebugEnabled()) {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandler.java Tue Jul 23 10:44:29 2019 +0200 @@ -0,0 +1,73 @@ +package com.passus.st.client.netflow; + +import com.passus.commons.Assert; +import com.passus.commons.time.TimeAware; +import com.passus.commons.time.TimeGenerator; +import com.passus.st.client.FlowContext; +import com.passus.st.client.FlowHandler; +import com.passus.st.client.FlowHandlerDataDecoder; +import com.passus.st.client.FlowHandlerDataEncoder; +import com.passus.st.metric.MetricsContainer; + +import static com.passus.st.Protocols.NETFLOW; + +public class NetflowFlowHandler implements FlowHandler, TimeAware { + + private final NetflowFlowHandlerDataEncoder encoder; + + TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); + + boolean collectMetrics = false; + + public NetflowFlowHandler() { + this.encoder = new NetflowFlowHandlerDataEncoder(); + } + + @Override + public int getProtocolId() { + return NETFLOW; + } + + @Override + public boolean isCollectMetrics() { + return collectMetrics; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + this.collectMetrics = collectMetrics; + } + + @Override + public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) { + return null; + } + + @Override + public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) { + return encoder; + } + + @Override + public void writeMetrics(MetricsContainer container) { + + } + + @Override + public TimeGenerator getTimeGenerator() { + return timeGenerator; + } + + @Override + public void setTimeGenerator(TimeGenerator timeGenerator) { + Assert.notNull(timeGenerator, "timeGenerator"); + this.timeGenerator = timeGenerator; + } + + @Override + public void init(FlowContext flowContext) { + flowContext.setBidirectional(false); + } + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/netflow/NetflowFlowHandlerDataEncoder.java Tue Jul 23 10:44:29 2019 +0200 @@ -0,0 +1,22 @@ +package com.passus.st.client.netflow; + +import com.passus.data.ByteBuff; +import com.passus.net.netflow.*; +import com.passus.st.client.FlowContext; +import com.passus.st.client.FlowHandlerDataEncoder; + +public class NetflowFlowHandlerDataEncoder implements FlowHandlerDataEncoder<Netflow> { + + private final Netflow5Encoder netflow5Encoder = new Netflow5Encoder(); + + private final Netflow9Encoder netflow9Encoder = new Netflow9Encoder(); + + @Override + public void encode(Netflow request, FlowContext flowContext, ByteBuff out) { + if (request.getVersion() == NetflowUtils.VERSION_5) { + netflow5Encoder.encode((Netflow5) request, out); + } else if (request.getVersion() == NetflowUtils.VERSION_9) { + netflow9Encoder.encode((Netflow9) request, out); + } + } +}
--- a/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java Mon Jul 22 15:46:58 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java Tue Jul 23 10:44:29 2019 +0200 @@ -10,6 +10,10 @@ */ public interface ChannelContext { + boolean isBidirectional(); + + void setBidirectional(boolean bidirectional); + boolean isConnected(); boolean isConnectionPending();
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Mon Jul 22 15:46:58 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Tue Jul 23 10:44:29 2019 +0200 @@ -62,7 +62,6 @@ this(srcIp, srcPort, dstIp, dstPort, DEFAULT_TRANSPORT); } - public SessionInfo(IpAddress srcIp, int srcPort, IpAddress dstIp, int dstPort, int transport) { this(srcIp, srcPort, dstIp, dstPort, transport, UNKNOWN); }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java Mon Jul 22 15:46:58 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java Tue Jul 23 10:44:29 2019 +0200 @@ -305,7 +305,10 @@ logger.debug(e.getMessage(), e); } - setOpRead(key); + if (channelContext.isBidirectional()) { + setOpRead(key); + } + clearOpWrite(key); }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java Mon Jul 22 15:46:58 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java Tue Jul 23 10:44:29 2019 +0200 @@ -19,6 +19,8 @@ protected final T channel; + protected boolean bidirectional = true; + protected final Queue<ByteBuffer> dataQueue; protected SocketAddress localAddress; @@ -35,6 +37,16 @@ this.dataQueue = new LinkedList<>(); } + @Override + public boolean isBidirectional() { + return bidirectional; + } + + @Override + public void setBidirectional(boolean bidirectional) { + this.bidirectional = bidirectional; + } + public void selectionKey(SelectionKey key) { this.key = key; }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java Mon Jul 22 15:46:58 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java Tue Jul 23 10:44:29 2019 +0200 @@ -5,6 +5,7 @@ import com.passus.net.utils.AddressUtils; import com.passus.st.emitter.ChannelContext; import com.passus.st.emitter.SessionInfo; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; @@ -13,7 +14,6 @@ import java.util.Queue; /** - * * @author Mirosław Hawrot */ public class NioChannelContext2 implements ChannelContext { @@ -24,6 +24,8 @@ private final SocketChannel channel; + protected boolean bidirectional = true; + private final Queue<ByteBuffer> dataQueue; private SocketAddress localAddress; @@ -36,7 +38,7 @@ * Usunac */ public long regTime; - + public NioChannelContext2(NioEmitterWorker2 worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) { this.worker = worker; this.channel = channel; @@ -46,6 +48,16 @@ } + @Override + public boolean isBidirectional() { + return bidirectional; + } + + @Override + public void setBidirectional(boolean bidirectional) { + this.bidirectional = bidirectional; + } + Queue<ByteBuffer> dataQueue() { return dataQueue; }
--- a/stress-tester/src/main/java/com/passus/st/source/PcapNetflowSessionAnalyzerHook.java Mon Jul 22 15:46:58 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/source/PcapNetflowSessionAnalyzerHook.java Tue Jul 23 10:44:29 2019 +0200 @@ -4,6 +4,7 @@ import com.passus.net.session.SessionAnalyzer; import static com.passus.st.Protocols.DNS; +import static com.passus.st.Protocols.NETFLOW; public class PcapNetflowSessionAnalyzerHook extends PcapSessionAnalyzerHook { @@ -17,7 +18,7 @@ PcapUnidirectionalAnalyzerListener listener = new PcapUnidirectionalAnalyzerListener(context.getSourceName(), context.getEventHandler(), context.isCollectMetric(), - context.getMetric(), DNS); + context.getMetric(), NETFLOW); analyzer.setListener(listener); context.getUdpProcessor().addAnalyzer(analyzer);
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Mon Jul 22 15:46:58 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Tue Jul 23 10:44:29 2019 +0200 @@ -129,6 +129,8 @@ private SocketAddress remoteAddress; + protected boolean bidirectional = true; + public LocalChannelContext(LocalEmitter emitter, EmitterHandler handler, SocketAddress remoteAddress, SessionInfo sessionInfo) { this.emitter = emitter; this.handler = handler; @@ -138,6 +140,16 @@ } @Override + public boolean isBidirectional() { + return bidirectional; + } + + @Override + public void setBidirectional(boolean unidirectional) { + this.bidirectional = unidirectional; + } + + @Override public boolean isConnected() { throw new UnsupportedOperationException("Not supported yet."); }
--- a/stress-tester/src/test/java/com/passus/st/utils/EventUtils.java Mon Jul 22 15:46:58 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/utils/EventUtils.java Tue Jul 23 10:44:29 2019 +0200 @@ -5,6 +5,8 @@ import com.passus.net.PortRangeSet; import com.passus.net.dns.session.DnsUdpSessionAnalyzer; import com.passus.net.http.session.HttpSessionAnalyzer; +import com.passus.net.netflow.NetflowUdpSessionAnalyzer; +import com.passus.net.session.SessionAnalyzer; import com.passus.st.Protocols; import com.passus.st.client.ArrayListEventHandler; import com.passus.st.client.Event; @@ -51,8 +53,9 @@ src.setAllowPartialSession(allowPartialSession); src.setPcapFile(pcapFile.getAbsolutePath()); + SessionAnalyzer analyzer; if (protocols.contains(Protocols.HTTP)) { - HttpSessionAnalyzer analyzer = new HttpSessionAnalyzer(); + analyzer = new HttpSessionAnalyzer(); if (portsRange != null) { analyzer.getPortsRange().addAll(portsRange); } @@ -60,7 +63,15 @@ } if (protocols.contains(Protocols.DNS)) { - DnsUdpSessionAnalyzer analyzer = new DnsUdpSessionAnalyzer(); + analyzer = new DnsUdpSessionAnalyzer(); + if (portsRange != null) { + analyzer.getPortsRange().addAll(portsRange); + } + src.addAnalyzer(analyzer); + } + + if (protocols.contains(Protocols.NETFLOW)) { + analyzer = new NetflowUdpSessionAnalyzer(); if (portsRange != null) { analyzer.getPortsRange().addAll(portsRange); }