Mercurial > stress-tester
changeset 982:ff71b37f293c
FlowExecutorDnsTest + SimpleDatagramServer improvements
author | Devel 2 |
---|---|
date | Wed, 31 Jul 2019 14:38:49 +0200 |
parents | da4a9beebbd3 |
children | d3a25b8f596d |
files | stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java |
diffstat | 2 files changed, 145 insertions(+), 9 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java Wed Jul 31 14:38:49 2019 +0200 @@ -0,0 +1,97 @@ +package com.passus.st.client.dns; + +import com.passus.commons.service.ServiceUtils; +import com.passus.commons.utils.ArrayUtils; +import com.passus.data.ByteBuff; +import com.passus.data.HeapByteBuff; +import com.passus.net.dns.Dns; +import com.passus.net.dns.DnsARecord; +import com.passus.net.dns.DnsBuilder; +import com.passus.net.dns.DnsEncoder; +import com.passus.st.Protocols; +import com.passus.st.client.Event; +import com.passus.st.client.FlowExecutor; +import com.passus.st.emitter.RuleBasedSessionMapper; +import com.passus.st.emitter.nio.NioEmitter; +import com.passus.st.utils.EventUtils; +import com.passus.st.utils.SimpleDatagramServer; +import com.passus.st.utils.SimpleDatagramServer.ServerContext; +import org.apache.commons.lang3.mutable.MutableObject; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNotNull; + +public class FlowExecutorDnsTest { + + private NioEmitter prepareEmitter(String mapperRule) throws Exception { + RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper(); + sessionMapper.addRule(mapperRule); + + NioEmitter emitter = new NioEmitter(); + emitter.setSessionMapper(sessionMapper); + return emitter; + } + + @Test + public void testHandle_DNS() throws Exception { + SimpleDatagramServer server = new SimpleDatagramServer(); + server.setPort(53); + server.setListener(new SimpleDatagramServer.ServerListener() { + @Override + public void onDataReceived(Object data, ServerContext context) { + Dns dns = new DnsBuilder().answerA("test.com", "1.1.1.1").build(); + DnsEncoder encoder = new DnsEncoder(); + ByteBuff buff = new HeapByteBuff(); + encoder.encode(dns, buff); + try { + context.write(buff); + } catch (IOException e) { + + } + } + + }); + server.start(); + + Map<String, Object> props = new HashMap<>(); + props.put("protocols", ArrayUtils.asSet(Protocols.DNS)); + props.put("allowPartialSession", true); + List<Event> events = EventUtils.readEvents("pcap/dns/dns_A_req_resp.pcap", props); + assertEquals(5, events.size()); + + NioEmitter emitter = prepareEmitter("192.168.170.20:53 -> localhost:53"); + emitter.start(); + + FlowExecutor flowExecutor = new FlowExecutor(); + flowExecutor.setEmitter(emitter); + flowExecutor.setConnectPartialSession(true); + try { + MutableObject<Dns> res = new MutableObject<>(); + flowExecutor.setListener((request, response, context) -> { + res.setValue((Dns) response); + }); + flowExecutor.start(); + events.forEach(flowExecutor::handle); + flowExecutor.join(); + + Dns dnsResponse = res.getValue(); + assertNotNull(dnsResponse); + assertEquals(1, dnsResponse.getAnswers().size()); + + DnsARecord dnsRecord = (DnsARecord) dnsResponse.getAnswers().get(0); + assertEquals("test.com", dnsRecord.getName()); + } finally { + ServiceUtils.stopQuietly(flowExecutor); + ServiceUtils.stopQuietly(emitter); + ServiceUtils.stopQuietly(server); + } + + } + +}
--- a/stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java Wed Jul 31 14:01:03 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java Wed Jul 31 14:38:49 2019 +0200 @@ -2,6 +2,7 @@ import com.passus.commons.service.Service; import com.passus.commons.service.ServiceException; +import com.passus.data.ByteBuff; import com.passus.data.DataDecoder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -123,6 +124,7 @@ public void run() { working = true; byte[] buffer = new byte[8048]; + ServerContext context = new ServerContext(serverSocket); while (working) { DatagramPacket packet = new DatagramPacket(buffer, buffer.length); try { @@ -131,12 +133,14 @@ } serverSocket.receive(packet); + context.clientAddress = packet.getAddress(); + context.clientPort = packet.getPort(); if (packet.getLength() > 0) { if (decoder != null) { decoder.decode(packet.getData(), packet.getOffset(), packet.getLength()); if (decoder.state() == DataDecoder.STATE_FINISHED) { if (listener != null) { - listener.onDataReceived(decoder.getResult()); + listener.onDataReceived(decoder.getResult(), context); } decoder.clear(); @@ -148,23 +152,23 @@ byte[] data = new byte[packet.getLength()]; System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength()); if (listener != null) { - listener.onDataReceived(data); + listener.onDataReceived(data, context); } } } } catch (SocketException e) { - if(e.getMessage().contains("socket closed")) { + if (e.getMessage().contains("socket closed")) { break; } if (listener != null) { - listener.onError(e); + listener.onError(e, context); } LOGGER.error(e.getMessage(), e); } catch (IOException e) { if (listener != null) { - listener.onError(e); + listener.onError(e, context); } LOGGER.error(e.getMessage(), e); @@ -177,9 +181,44 @@ public static interface ServerListener<T> { - void onDataReceived(T data); + default void onDataReceived(T data, ServerContext context) { - void onError(Exception ex); + } + + default void onError(Exception ex, ServerContext context) { + + } + } + + public static class ServerContext { + + private final DatagramSocket serverSocket; + + private InetAddress clientAddress; + + private int clientPort; + + public ServerContext(DatagramSocket serverSocket) { + this.serverSocket = serverSocket; + } + + public InetAddress getClientAddress() { + return clientAddress; + } + + public int getClientPort() { + return clientPort; + } + + public void write(ByteBuff buff) throws IOException { + write(buff.buffer(), buff.startIndex(), buff.length()); + } + + public void write(byte[] data, int offset, int length) throws IOException { + DatagramPacket packet = new DatagramPacket(data, offset, length, clientAddress, clientPort); + serverSocket.send(packet); + } + } public static class DefaultServerListener<T> implements ServerListener<T> { @@ -222,7 +261,7 @@ } @Override - public void onDataReceived(T data) { + public void onDataReceived(T data, ServerContext context) { long now = System.currentTimeMillis(); if (firstTimestamp == -1) { firstTimestamp = now; @@ -233,7 +272,7 @@ } @Override - public void onError(Exception ex) { + public void onError(Exception ex, ServerContext context) { errors++; lastException = ex; }