changeset 982:ff71b37f293c

FlowExecutorDnsTest + SimpleDatagramServer improvements
author Devel 2
date Wed, 31 Jul 2019 14:38:49 +0200
parents da4a9beebbd3
children d3a25b8f596d
files stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java
diffstat 2 files changed, 145 insertions(+), 9 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java	Wed Jul 31 14:38:49 2019 +0200
@@ -0,0 +1,97 @@
+package com.passus.st.client.dns;
+
+import com.passus.commons.service.ServiceUtils;
+import com.passus.commons.utils.ArrayUtils;
+import com.passus.data.ByteBuff;
+import com.passus.data.HeapByteBuff;
+import com.passus.net.dns.Dns;
+import com.passus.net.dns.DnsARecord;
+import com.passus.net.dns.DnsBuilder;
+import com.passus.net.dns.DnsEncoder;
+import com.passus.st.Protocols;
+import com.passus.st.client.Event;
+import com.passus.st.client.FlowExecutor;
+import com.passus.st.emitter.RuleBasedSessionMapper;
+import com.passus.st.emitter.nio.NioEmitter;
+import com.passus.st.utils.EventUtils;
+import com.passus.st.utils.SimpleDatagramServer;
+import com.passus.st.utils.SimpleDatagramServer.ServerContext;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNotNull;
+
+public class FlowExecutorDnsTest {
+
+    private NioEmitter prepareEmitter(String mapperRule) throws Exception {
+        RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper();
+        sessionMapper.addRule(mapperRule);
+
+        NioEmitter emitter = new NioEmitter();
+        emitter.setSessionMapper(sessionMapper);
+        return emitter;
+    }
+
+    @Test
+    public void testHandle_DNS() throws Exception {
+        SimpleDatagramServer server = new SimpleDatagramServer();
+        server.setPort(53);
+        server.setListener(new SimpleDatagramServer.ServerListener() {
+            @Override
+            public void onDataReceived(Object data, ServerContext context) {
+                Dns dns = new DnsBuilder().answerA("test.com", "1.1.1.1").build();
+                DnsEncoder encoder = new DnsEncoder();
+                ByteBuff buff = new HeapByteBuff();
+                encoder.encode(dns, buff);
+                try {
+                    context.write(buff);
+                } catch (IOException e) {
+
+                }
+            }
+
+        });
+        server.start();
+
+        Map<String, Object> props = new HashMap<>();
+        props.put("protocols", ArrayUtils.asSet(Protocols.DNS));
+        props.put("allowPartialSession", true);
+        List<Event> events = EventUtils.readEvents("pcap/dns/dns_A_req_resp.pcap", props);
+        assertEquals(5, events.size());
+
+        NioEmitter emitter = prepareEmitter("192.168.170.20:53 -> localhost:53");
+        emitter.start();
+
+        FlowExecutor flowExecutor = new FlowExecutor();
+        flowExecutor.setEmitter(emitter);
+        flowExecutor.setConnectPartialSession(true);
+        try {
+            MutableObject<Dns> res = new MutableObject<>();
+            flowExecutor.setListener((request, response, context) -> {
+                res.setValue((Dns) response);
+            });
+            flowExecutor.start();
+            events.forEach(flowExecutor::handle);
+            flowExecutor.join();
+
+            Dns dnsResponse = res.getValue();
+            assertNotNull(dnsResponse);
+            assertEquals(1, dnsResponse.getAnswers().size());
+
+            DnsARecord dnsRecord = (DnsARecord) dnsResponse.getAnswers().get(0);
+            assertEquals("test.com", dnsRecord.getName());
+        } finally {
+            ServiceUtils.stopQuietly(flowExecutor);
+            ServiceUtils.stopQuietly(emitter);
+            ServiceUtils.stopQuietly(server);
+        }
+
+    }
+
+}
--- a/stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java	Wed Jul 31 14:01:03 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java	Wed Jul 31 14:38:49 2019 +0200
@@ -2,6 +2,7 @@
 
 import com.passus.commons.service.Service;
 import com.passus.commons.service.ServiceException;
+import com.passus.data.ByteBuff;
 import com.passus.data.DataDecoder;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -123,6 +124,7 @@
         public void run() {
             working = true;
             byte[] buffer = new byte[8048];
+            ServerContext context = new ServerContext(serverSocket);
             while (working) {
                 DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                 try {
@@ -131,12 +133,14 @@
                     }
 
                     serverSocket.receive(packet);
+                    context.clientAddress = packet.getAddress();
+                    context.clientPort = packet.getPort();
                     if (packet.getLength() > 0) {
                         if (decoder != null) {
                             decoder.decode(packet.getData(), packet.getOffset(), packet.getLength());
                             if (decoder.state() == DataDecoder.STATE_FINISHED) {
                                 if (listener != null) {
-                                    listener.onDataReceived(decoder.getResult());
+                                    listener.onDataReceived(decoder.getResult(), context);
                                 }
 
                                 decoder.clear();
@@ -148,23 +152,23 @@
                             byte[] data = new byte[packet.getLength()];
                             System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength());
                             if (listener != null) {
-                                listener.onDataReceived(data);
+                                listener.onDataReceived(data, context);
                             }
                         }
                     }
                 } catch (SocketException e) {
-                    if(e.getMessage().contains("socket closed")) {
+                    if (e.getMessage().contains("socket closed")) {
                         break;
                     }
 
                     if (listener != null) {
-                        listener.onError(e);
+                        listener.onError(e, context);
                     }
 
                     LOGGER.error(e.getMessage(), e);
                 } catch (IOException e) {
                     if (listener != null) {
-                        listener.onError(e);
+                        listener.onError(e, context);
                     }
 
                     LOGGER.error(e.getMessage(), e);
@@ -177,9 +181,44 @@
 
     public static interface ServerListener<T> {
 
-        void onDataReceived(T data);
+        default void onDataReceived(T data, ServerContext context) {
 
-        void onError(Exception ex);
+        }
+
+        default void onError(Exception ex, ServerContext context) {
+
+        }
+    }
+
+    public static class ServerContext {
+
+        private final DatagramSocket serverSocket;
+
+        private InetAddress clientAddress;
+
+        private int clientPort;
+
+        public ServerContext(DatagramSocket serverSocket) {
+            this.serverSocket = serverSocket;
+        }
+
+        public InetAddress getClientAddress() {
+            return clientAddress;
+        }
+
+        public int getClientPort() {
+            return clientPort;
+        }
+
+        public void write(ByteBuff buff) throws IOException {
+            write(buff.buffer(), buff.startIndex(), buff.length());
+        }
+
+        public void write(byte[] data, int offset, int length) throws IOException {
+            DatagramPacket packet = new DatagramPacket(data, offset, length, clientAddress, clientPort);
+            serverSocket.send(packet);
+        }
+
     }
 
     public static class DefaultServerListener<T> implements ServerListener<T> {
@@ -222,7 +261,7 @@
         }
 
         @Override
-        public void onDataReceived(T data) {
+        public void onDataReceived(T data, ServerContext context) {
             long now = System.currentTimeMillis();
             if (firstTimestamp == -1) {
                 firstTimestamp = now;
@@ -233,7 +272,7 @@
         }
 
         @Override
-        public void onError(Exception ex) {
+        public void onError(Exception ex, ServerContext context) {
             errors++;
             lastException = ex;
         }