Mercurial > stress-tester
changeset 1029:e47bfb487cfd
UnidirectionalUdpPcapEmitterTest.testSend + bugfix
author | Devel 1 |
---|---|
date | Thu, 02 Apr 2020 10:26:17 +0200 |
parents | 056fa28115ff |
children | 170c8ce25bef |
files | stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java stress-tester/src/main/java/com/passus/st/utils/EventUtils.java stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitterTest.java |
diffstat | 3 files changed, 157 insertions(+), 7 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java Thu Apr 02 08:52:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/pcap/UnidirectionalPcapWorker.java Thu Apr 02 10:26:17 2020 +0200 @@ -210,6 +210,7 @@ } localMac = new MACAddress(localHwAddress); device = getPcapDeviceName(networkInterface); + LOGGER.debug("Found device " + device + " for ifc " + networkInterface); } catch (IOException ex) { doCatchException(channelContext, ex); return; @@ -222,7 +223,6 @@ MACAddress remoteMac = macResolver.resolve(remoteAddress.getIp()); channelContext = new UnidirectionalPcapChannelContext(this, handler, sessionInfo, localAddress, remoteAddress, localMac, remoteMac); - sessions.put(sessionInfo, channelContext); try { @@ -266,7 +266,7 @@ return; } - Queue<ByteBuffer> queue = channelContext.dataQueue; + Queue<byte[]> queue = channelContext.dataQueue; if (queue.isEmpty()) { return; } @@ -280,11 +280,15 @@ int written = 0; try { Pcap pcap = channelContext.getPcap(); - int length = 0; - if (pcap.sendPacket(length, channelContext.buffer) == 0) { - written = length; - } else { - throw new IOException("Unable to send packet. Pcap error."); + + while (!queue.isEmpty()) { + byte[] buf = queue.poll(); + int res = pcap.sendPacket(buf.length, buf); + if (res == 0) { + written += buf.length; + } else { + throw new IOException("Unable to send packet. Pcap error: " + pcap.getErr()); + } } } catch (Exception e) { doCatchException(channelContext, e);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/utils/EventUtils.java Thu Apr 02 10:26:17 2020 +0200 @@ -0,0 +1,113 @@ +package com.passus.st.utils; + +import com.passus.commons.utils.ArrayUtils; +import com.passus.commons.utils.ResourceUtils; +import com.passus.net.PortRangeSet; +import com.passus.net.dns.session.DnsUdpSessionAnalyzer; +import com.passus.net.http.session.HttpSessionAnalyzer; +import com.passus.net.netflow.NetflowUdpSessionAnalyzer; +import com.passus.net.session.SessionAnalyzer; +import com.passus.st.Protocols; +import com.passus.st.client.ArrayListEventHandler; +import com.passus.st.client.Event; +import com.passus.st.source.PcapSessionEventSource; + +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * @author Mirosław Hawrot + */ +public class EventUtils { + + private EventUtils() { + } + + public static List<Event> readEvents(String pcapRelativeFile) { + return readEvents(pcapRelativeFile, null); + } + + public static List<Event> readEvents(String pcapRelativeFile, Map<String, Object> props) { + File pcapFile = ResourceUtils.getFile(pcapRelativeFile); + return readEvents(pcapFile, props); + } + + public static List<Event> readEvents(File pcapFile, Map<String, Object> props) { + try { + boolean allowPartialSession = false; + Set<Integer> protocols = ArrayUtils.asSet(Protocols.HTTP); + PortRangeSet portsRange = null; + if (props != null) { + allowPartialSession = (boolean) props.getOrDefault("allowPartialSession", false); + + Object ports = props.getOrDefault("ports", null); + if (ports instanceof PortRangeSet) { + portsRange = (PortRangeSet) ports; + } else if (ports instanceof Integer) { + portsRange = new PortRangeSet(); + portsRange.add((Integer) ports); + } + + protocols = (Set<Integer>) props.getOrDefault("protocols", ArrayUtils.asSet(Protocols.HTTP)); + } + + PcapSessionEventSource src = new PcapSessionEventSource(); + src.setAllowPartialSession(allowPartialSession); + src.setPcapFile(pcapFile.getAbsolutePath()); + + SessionAnalyzer analyzer; + if (protocols.contains(Protocols.HTTP)) { + analyzer = new HttpSessionAnalyzer(); + if (portsRange != null) { + analyzer.getPortsRange().addAll(portsRange); + } + src.addAnalyzer(analyzer); + } + + if (protocols.contains(Protocols.DNS)) { + analyzer = new DnsUdpSessionAnalyzer(); + if (portsRange != null) { + analyzer.getPortsRange().addAll(portsRange); + } + src.addAnalyzer(analyzer); + } + + if (protocols.contains(Protocols.NETFLOW)) { + analyzer = new NetflowUdpSessionAnalyzer(); + if (portsRange != null) { + analyzer.getPortsRange().addAll(portsRange); + } + src.addAnalyzer(analyzer); + } + + src.setLoops(1); + + ArrayListEventHandler handler = new ArrayListEventHandler(); + src.setHandler(handler); + + src.start(); + waitForSource(src); + src.stop(); + + return handler.getEvents(); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + public static void waitForSource(PcapSessionEventSource src) { + try { + Thread.sleep(200); + } catch (Exception e) { + } + + while (src.isWorking()) { + try { + Thread.sleep(100); + } catch (Exception e) { + } + } + } +}
--- a/stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitterTest.java Thu Apr 02 08:52:28 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/emitter/pcap/UnidirectionalUdpPcapEmitterTest.java Thu Apr 02 10:26:17 2020 +0200 @@ -1,5 +1,6 @@ package com.passus.st.emitter.pcap; +import com.passus.data.HeapByteBuff; import com.passus.net.session.Session; import com.passus.st.AbstractWireMockTest; import com.passus.st.client.TestClientHandler; @@ -54,4 +55,36 @@ } } + @Test + public void testSend() throws Exception { + final byte[] ETH = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0}; + + UnidirectionalUdpPcapEmitter emitter = createEmitter(null); + emitter.start(); + try { + SessionInfo info = new SessionInfo("1.1.1.1", 5000, HOST, port(), Session.PROTOCOL_UDP); + + TestClientHandler handler = new TestClientHandler() { + + @Override + protected void doChannelActive(ChannelContext context) throws Exception { + context.writeAndFlush(new HeapByteBuff(ETH)); + context.close(); + } + }; + + emitter.connect(info, handler, 0); + AbstractEmitterTest.waitConn(handler, 5, 5_000); + + AssertJUnit.assertEquals(5, handler.size()); + AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_REGISTERED, handler.get(0).getType()); + AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_ACTIVE, handler.get(1).getType()); + AssertJUnit.assertEquals(TestClientHandler.EventType.DATA_WRITTEN, handler.get(2).getType()); + AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_INACTIVE, handler.get(3).getType()); + AssertJUnit.assertEquals(TestClientHandler.EventType.CHANNEL_UNREGISTERED, handler.get(4).getType()); + } finally { + emitter.stop(); + } + } + }