changeset 976:aff81768741e

FlowExecutorNetflowTest
author Devel 2
date Thu, 25 Jul 2019 11:25:45 +0200
parents 1d6458a1c1da
children 71d0f31b7e44
files stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java
diffstat 2 files changed, 269 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java	Thu Jul 25 11:25:45 2019 +0200
@@ -0,0 +1,75 @@
+package com.passus.st.client.netflow;
+
+import com.passus.commons.service.ServiceUtils;
+import com.passus.commons.utils.ArrayUtils;
+import com.passus.net.netflow.Netflow9;
+import com.passus.net.netflow.Netflow9Decoder;
+import com.passus.st.Protocols;
+import com.passus.st.client.Event;
+import com.passus.st.client.FlowExecutor;
+import com.passus.st.emitter.RuleBasedSessionMapper;
+import com.passus.st.emitter.nio.NioEmitter;
+import com.passus.st.utils.EventUtils;
+import com.passus.st.utils.SimpleDatagramServer;
+import com.passus.st.utils.SimpleDatagramServer.DefaultServerListener;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+public class FlowExecutorNetflowTest {
+
+    private NioEmitter prepareEmitter(String mapperRule) throws Exception {
+        RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper();
+        sessionMapper.addRule(mapperRule);
+
+        NioEmitter emitter = new NioEmitter();
+        emitter.setSessionMapper(sessionMapper);
+        return emitter;
+    }
+
+    @Test
+    public void testHandle_Netflow() throws Exception {
+        DefaultServerListener<Netflow9> listener = new DefaultServerListener<>();
+        SimpleDatagramServer server = new SimpleDatagramServer();
+        server.setDecoder(new Netflow9Decoder());
+        server.setListener(listener);
+        server.start();
+
+        Map<String, Object> props = new HashMap<>();
+        props.put("protocols", ArrayUtils.asSet(Protocols.NETFLOW));
+        List<Event> events = EventUtils.readEvents("pcap/netflow/netflow_v9_template_data.pcap", props);
+
+        NioEmitter emitter = prepareEmitter("*:* -> localhost:2000");
+        emitter.start();
+
+        FlowExecutor flowExecutor = new FlowExecutor();
+        flowExecutor.setEmitter(emitter);
+        flowExecutor.setConnectPartialSession(true);
+        flowExecutor.setWorkerType("parallel");
+        try {
+            flowExecutor.setListener((request, response, context) -> {
+                System.out.println(response);
+            });
+            flowExecutor.start();
+            events.forEach(flowExecutor::handle);
+            flowExecutor.join();
+
+            assertEquals(0, listener.getErrors());
+            assertEquals(2, listener.getReceived().size());
+            Iterator<Netflow9> it = listener.getReceived().iterator();
+            assertEquals(92463920L, it.next().getFlowSequence());
+            assertEquals(92463921L, it.next().getFlowSequence());
+        } finally {
+            ServiceUtils.stopQuietly(flowExecutor);
+            ServiceUtils.stopQuietly(emitter);
+            ServiceUtils.stopQuietly(server);
+        }
+
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java	Thu Jul 25 11:25:45 2019 +0200
@@ -0,0 +1,194 @@
+package com.passus.st.utils;
+
+import com.passus.commons.service.Service;
+import com.passus.commons.service.ServiceException;
+import com.passus.data.DataDecoder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SimpleDatagramServer implements Service {
+
+    private static final Logger LOGGER = LogManager.getLogger(SimpleDatagramServer.class);
+
+    private String host = "localhost";
+
+    private int port = 2000;
+
+    private DataDecoder decoder;
+
+    private boolean started;
+
+    private ServerThread serverThread;
+
+    private ServerListener listener;
+
+    private DatagramSocket serverSocket;
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public DataDecoder getDecoder() {
+        return decoder;
+    }
+
+    public void setDecoder(DataDecoder decoder) {
+        this.decoder = decoder;
+    }
+
+    public ServerListener getListener() {
+        return listener;
+    }
+
+    public void setListener(ServerListener listener) {
+        this.listener = listener;
+    }
+
+    @Override
+    public boolean isStarted() {
+        return started;
+    }
+
+    @Override
+    public void start() {
+        if (started) {
+            return;
+        }
+
+        try {
+            DatagramSocket serverSocket = new DatagramSocket(port, InetAddress.getByName(host));
+            LOGGER.debug("Datagram socket created.");
+            serverThread = new ServerThread(serverSocket);
+            serverThread.start();
+            started = true;
+        } catch (IOException e) {
+            throw new ServiceException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (!started) {
+            return;
+        }
+
+        serverThread.working = false;
+        serverSocket.disconnect();
+        serverThread.interrupt();
+        try {
+            serverThread.join(5_000);
+        } catch (InterruptedException ignore) {
+
+        }
+
+        started = false;
+    }
+
+    private class ServerThread extends Thread {
+
+        private final DatagramSocket serverSocket;
+
+        private boolean working = false;
+
+        public ServerThread(DatagramSocket serverSocket) {
+            this.serverSocket = serverSocket;
+        }
+
+        @Override
+        public void run() {
+            working = true;
+            byte[] buffer = new byte[8048];
+            while (working) {
+                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
+                try {
+                    serverSocket.receive(packet);
+                    if (packet.getLength() > 0) {
+                        if (decoder != null) {
+                            decoder.decode(packet.getData(), packet.getOffset(), packet.getLength());
+                            if (decoder.state() == DataDecoder.STATE_FINISHED) {
+                                if (listener != null) {
+                                    listener.onDataReceived(decoder.getResult());
+                                }
+
+                                decoder.clear();
+                            } else if (decoder.state() == DataDecoder.STATE_ERROR) {
+                                LOGGER.error("Decoder error. " + decoder.getLastError());
+                                decoder.clear();
+                            }
+                        } else {
+                            byte[] data = new byte[packet.getLength()];
+                            System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength());
+                            if (listener != null) {
+                                listener.onDataReceived(data);
+                            }
+                        }
+                    }
+                } catch (IOException e) {
+                    if (listener != null) {
+                        listener.onError(e);
+                    }
+
+                    LOGGER.error(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    public static interface ServerListener<T> {
+
+        void onDataReceived(T data);
+
+        void onError(Exception ex);
+    }
+
+    public static class DefaultServerListener<T> implements ServerListener<T> {
+
+        private List<T> received = new ArrayList<>();
+
+        private int errors = 0;
+
+        private Exception lastException;
+
+        public List<T> getReceived() {
+            return received;
+        }
+
+        public int getErrors() {
+            return errors;
+        }
+
+        public Exception getLastException() {
+            return lastException;
+        }
+
+        @Override
+        public void onDataReceived(T data) {
+            received.add(data);
+        }
+
+        @Override
+        public void onError(Exception ex) {
+            errors++;
+            lastException = ex;
+        }
+    }
+}