changeset 992:2e7e654f2ad5

SimpleDatagramServer refactorization
author Devel 2
date Fri, 06 Sep 2019 09:41:12 +0200
parents 1d5d4cfd88fb
children 604e4ef38dee
files stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java stress-tester/src/test/java/com/passus/st/utils/server/DatagramServerContext.java stress-tester/src/test/java/com/passus/st/utils/server/DefaultServerListener.java stress-tester/src/test/java/com/passus/st/utils/server/ServerContext.java stress-tester/src/test/java/com/passus/st/utils/server/ServerListener.java stress-tester/src/test/java/com/passus/st/utils/server/SimpleDatagramServer.java stress-tester/src/test/java/com/passus/st/utils/server/SimpleServer.java
diffstat 9 files changed, 319 insertions(+), 285 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java	Thu Sep 05 11:36:59 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java	Fri Sep 06 09:41:12 2019 +0200
@@ -14,8 +14,9 @@
 import com.passus.st.emitter.RuleBasedSessionMapper;
 import com.passus.st.emitter.nio.NioEmitter;
 import com.passus.st.utils.EventUtils;
-import com.passus.st.utils.SimpleDatagramServer;
-import com.passus.st.utils.SimpleDatagramServer.ServerContext;
+import com.passus.st.utils.server.ServerListener;
+import com.passus.st.utils.server.SimpleDatagramServer;
+import com.passus.st.utils.server.ServerContext;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.testng.annotations.Test;
 
@@ -42,7 +43,7 @@
     public void testHandle_DNS() throws Exception {
         SimpleDatagramServer server = new SimpleDatagramServer();
         server.setPort(2000);
-        server.setListener(new SimpleDatagramServer.ServerListener() {
+        server.setListener(new ServerListener() {
             @Override
             public void onDataReceived(Object data, ServerContext context) {
                 Dns dns = new DnsBuilder().answerA("test.com", "1.1.1.1").build();
--- a/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java	Thu Sep 05 11:36:59 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java	Fri Sep 06 09:41:12 2019 +0200
@@ -10,8 +10,8 @@
 import com.passus.st.emitter.RuleBasedSessionMapper;
 import com.passus.st.emitter.nio.NioEmitter;
 import com.passus.st.utils.EventUtils;
-import com.passus.st.utils.SimpleDatagramServer;
-import com.passus.st.utils.SimpleDatagramServer.DefaultServerListener;
+import com.passus.st.utils.server.SimpleDatagramServer;
+import com.passus.st.utils.server.DefaultServerListener;
 import org.testng.annotations.Test;
 
 import java.util.HashMap;
--- a/stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java	Thu Sep 05 11:36:59 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,280 +0,0 @@
-package com.passus.st.utils;
-
-import com.passus.commons.service.Service;
-import com.passus.commons.service.ServiceException;
-import com.passus.data.ByteBuff;
-import com.passus.data.DataDecoder;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.SocketException;
-import java.util.ArrayList;
-import java.util.List;
-
-public class SimpleDatagramServer implements Service {
-
-    private static final Logger LOGGER = LogManager.getLogger(SimpleDatagramServer.class);
-
-    private String host = "localhost";
-
-    private int port = 2000;
-
-    private DataDecoder decoder;
-
-    private boolean started;
-
-    private ServerThread serverThread;
-
-    private ServerListener listener;
-
-    private DatagramSocket serverSocket;
-
-    public String getHost() {
-        return host;
-    }
-
-    public void setHost(String host) {
-        this.host = host;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    public void setPort(int port) {
-        this.port = port;
-    }
-
-    public DataDecoder getDecoder() {
-        return decoder;
-    }
-
-    public void setDecoder(DataDecoder decoder) {
-        this.decoder = decoder;
-    }
-
-    public ServerListener getListener() {
-        return listener;
-    }
-
-    public void setListener(ServerListener listener) {
-        this.listener = listener;
-    }
-
-    @Override
-    public boolean isStarted() {
-        return started;
-    }
-
-    @Override
-    public void start() {
-        if (started) {
-            return;
-        }
-
-        try {
-            serverSocket = new DatagramSocket(port, InetAddress.getByName(host));
-            LOGGER.debug("Datagram socket created.");
-            serverThread = new ServerThread(serverSocket);
-            serverThread.start();
-            started = true;
-        } catch (IOException e) {
-            throw new ServiceException(e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (!started) {
-            return;
-        }
-
-        try {
-            serverSocket.close();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-        serverThread.working = false;
-        serverThread.interrupt();
-        try {
-            serverThread.join();
-        } catch (InterruptedException ignore) {
-
-        }
-
-        started = false;
-    }
-
-    private class ServerThread extends Thread {
-
-        private final DatagramSocket serverSocket;
-
-        private boolean working = false;
-
-        public ServerThread(DatagramSocket serverSocket) {
-            this.serverSocket = serverSocket;
-        }
-
-        @Override
-        public void run() {
-            working = true;
-            byte[] buffer = new byte[8048];
-            ServerContext context = new ServerContext(serverSocket);
-            while (working) {
-                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
-                try {
-                    if (serverSocket.isClosed()) {
-                        break;
-                    }
-
-                    serverSocket.receive(packet);
-                    context.clientAddress = packet.getAddress();
-                    context.clientPort = packet.getPort();
-                    if (packet.getLength() > 0) {
-                        if (decoder != null) {
-                            decoder.decode(packet.getData(), packet.getOffset(), packet.getLength());
-                            if (decoder.state() == DataDecoder.STATE_FINISHED) {
-                                if (listener != null) {
-                                    listener.onDataReceived(decoder.getResult(), context);
-                                }
-
-                                decoder.clear();
-                            } else if (decoder.state() == DataDecoder.STATE_ERROR) {
-                                LOGGER.error("Decoder error. " + decoder.getLastError());
-                                decoder.clear();
-                            }
-                        } else {
-                            byte[] data = new byte[packet.getLength()];
-                            System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength());
-                            if (listener != null) {
-                                listener.onDataReceived(data, context);
-                            }
-                        }
-                    }
-                } catch (SocketException e) {
-                    if (e.getMessage().contains("socket closed")) {
-                        break;
-                    }
-
-                    if (listener != null) {
-                        listener.onError(e, context);
-                    }
-
-                    LOGGER.error(e.getMessage(), e);
-                } catch (IOException e) {
-                    if (listener != null) {
-                        listener.onError(e, context);
-                    }
-
-                    LOGGER.error(e.getMessage(), e);
-                }
-            }
-
-            LOGGER.debug("Server thread stopped.");
-        }
-    }
-
-    public static interface ServerListener<T> {
-
-        default void onDataReceived(T data, ServerContext context) {
-
-        }
-
-        default void onError(Exception ex, ServerContext context) {
-
-        }
-    }
-
-    public static class ServerContext {
-
-        private final DatagramSocket serverSocket;
-
-        private InetAddress clientAddress;
-
-        private int clientPort;
-
-        public ServerContext(DatagramSocket serverSocket) {
-            this.serverSocket = serverSocket;
-        }
-
-        public InetAddress getClientAddress() {
-            return clientAddress;
-        }
-
-        public int getClientPort() {
-            return clientPort;
-        }
-
-        public void write(ByteBuff buff) throws IOException {
-            write(buff.buffer(), buff.startIndex(), buff.length());
-        }
-
-        public void write(byte[] data, int offset, int length) throws IOException {
-            DatagramPacket packet = new DatagramPacket(data, offset, length, clientAddress, clientPort);
-            serverSocket.send(packet);
-        }
-
-    }
-
-    public static class DefaultServerListener<T> implements ServerListener<T> {
-
-        private List<T> received = new ArrayList<>();
-
-        private long firstTimestamp = -1;
-
-        private long lastTimestamp = -1;
-
-        private int errors = 0;
-
-        private Exception lastException;
-
-        public List<T> getReceived() {
-            return received;
-        }
-
-        public long getFirstTimestamp() {
-            return firstTimestamp;
-        }
-
-        public long getLastTimestamp() {
-            return lastTimestamp;
-        }
-
-        public int getErrors() {
-            return errors;
-        }
-
-        public Exception getLastException() {
-            return lastException;
-        }
-
-        public void clear() {
-            received.clear();
-            firstTimestamp = -1;
-            lastTimestamp = -1;
-            errors = 0;
-        }
-
-        @Override
-        public void onDataReceived(T data, ServerContext context) {
-            long now = System.currentTimeMillis();
-            if (firstTimestamp == -1) {
-                firstTimestamp = now;
-            }
-
-            received.add(data);
-            lastTimestamp = now;
-        }
-
-        @Override
-        public void onError(Exception ex, ServerContext context) {
-            errors++;
-            lastException = ex;
-        }
-    }
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/utils/server/DatagramServerContext.java	Fri Sep 06 09:41:12 2019 +0200
@@ -0,0 +1,38 @@
+package com.passus.st.utils.server;
+
+import com.passus.data.ByteBuff;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+
+public class DatagramServerContext implements ServerContext {
+
+    private final DatagramSocket serverSocket;
+
+    InetAddress clientAddress;
+
+    int clientPort;
+
+    public DatagramServerContext(DatagramSocket serverSocket) {
+        this.serverSocket = serverSocket;
+    }
+
+    public InetAddress getClientAddress() {
+        return clientAddress;
+    }
+
+    public int getClientPort() {
+        return clientPort;
+    }
+
+    public void write(ByteBuff buff) throws IOException {
+        write(buff.buffer(), buff.startIndex(), buff.length());
+    }
+
+    public void write(byte[] data, int offset, int length) throws IOException {
+        DatagramPacket packet = new DatagramPacket(data, offset, length, clientAddress, clientPort);
+        serverSocket.send(packet);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/utils/server/DefaultServerListener.java	Fri Sep 06 09:41:12 2019 +0200
@@ -0,0 +1,61 @@
+package com.passus.st.utils.server;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DefaultServerListener<T> implements ServerListener<T> {
+
+    private List<T> received = new ArrayList<>();
+
+    private long firstTimestamp = -1;
+
+    private long lastTimestamp = -1;
+
+    private int errors = 0;
+
+    private Exception lastException;
+
+    public List<T> getReceived() {
+        return received;
+    }
+
+    public long getFirstTimestamp() {
+        return firstTimestamp;
+    }
+
+    public long getLastTimestamp() {
+        return lastTimestamp;
+    }
+
+    public int getErrors() {
+        return errors;
+    }
+
+    public Exception getLastException() {
+        return lastException;
+    }
+
+    public void clear() {
+        received.clear();
+        firstTimestamp = -1;
+        lastTimestamp = -1;
+        errors = 0;
+    }
+
+    @Override
+    public void onDataReceived(T data, ServerContext context) {
+        long now = System.currentTimeMillis();
+        if (firstTimestamp == -1) {
+            firstTimestamp = now;
+        }
+
+        received.add(data);
+        lastTimestamp = now;
+    }
+
+    @Override
+    public void onError(Exception ex, ServerContext context) {
+        errors++;
+        lastException = ex;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/utils/server/ServerContext.java	Fri Sep 06 09:41:12 2019 +0200
@@ -0,0 +1,13 @@
+package com.passus.st.utils.server;
+
+import com.passus.data.ByteBuff;
+
+import java.io.IOException;
+
+public interface ServerContext {
+
+    void write(ByteBuff buff) throws IOException;
+
+    void write(byte[] data, int offset, int length) throws IOException;
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/utils/server/ServerListener.java	Fri Sep 06 09:41:12 2019 +0200
@@ -0,0 +1,12 @@
+package com.passus.st.utils.server;
+
+public interface ServerListener<T> {
+
+    default void onDataReceived(T data, ServerContext context) {
+
+    }
+
+    default void onError(Exception ex, ServerContext context) {
+
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/utils/server/SimpleDatagramServer.java	Fri Sep 06 09:41:12 2019 +0200
@@ -0,0 +1,117 @@
+package com.passus.st.utils.server;
+
+import com.passus.commons.service.ServiceException;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.SocketException;
+
+public class SimpleDatagramServer extends SimpleServer {
+
+    private boolean started;
+
+    private ServerThread serverThread;
+
+    private DatagramSocket serverSocket;
+
+    @Override
+    public boolean isStarted() {
+        return started;
+    }
+
+    @Override
+    public void start() {
+        if (started) {
+            return;
+        }
+
+        try {
+            serverSocket = new DatagramSocket(port, InetAddress.getByName(host));
+            logger.debug("Datagram socket created.");
+            serverThread = new ServerThread(serverSocket);
+            serverThread.start();
+            started = true;
+        } catch (IOException e) {
+            throw new ServiceException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (!started) {
+            return;
+        }
+
+        try {
+            serverSocket.close();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        serverThread.working = false;
+        serverThread.interrupt();
+        try {
+            serverThread.join();
+        } catch (InterruptedException ignore) {
+
+        }
+
+        started = false;
+    }
+
+    private class ServerThread extends Thread {
+
+        private final DatagramSocket serverSocket;
+
+        private boolean working = false;
+
+        public ServerThread(DatagramSocket serverSocket) {
+            this.serverSocket = serverSocket;
+        }
+
+        @Override
+        public void run() {
+            working = true;
+            byte[] buffer = new byte[8048];
+            DatagramServerContext context = new DatagramServerContext(serverSocket);
+            while (working) {
+                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
+                try {
+                    if (serverSocket.isClosed()) {
+                        break;
+                    }
+
+                    serverSocket.receive(packet);
+                    context.clientAddress = packet.getAddress();
+                    context.clientPort = packet.getPort();
+                    if (packet.getLength() > 0) {
+                        decode(packet.getData(), packet.getOffset(), packet.getLength(), context);
+                    }
+                } catch (SocketException e) {
+                    if (e.getMessage().contains("socket closed")) {
+                        break;
+                    }
+
+                    if (listener != null) {
+                        listener.onError(e, context);
+                    }
+
+                    logger.error(e.getMessage(), e);
+                } catch (IOException e) {
+                    if (listener != null) {
+                        listener.onError(e, context);
+                    }
+
+                    logger.error(e.getMessage(), e);
+                }
+            }
+
+            logger.debug("Server thread stopped.");
+        }
+    }
+
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/utils/server/SimpleServer.java	Fri Sep 06 09:41:12 2019 +0200
@@ -0,0 +1,72 @@
+package com.passus.st.utils.server;
+
+import com.passus.commons.service.Service;
+import com.passus.data.DataDecoder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public abstract class SimpleServer implements Service {
+
+    protected final Logger logger = LogManager.getLogger(getClass());
+
+    protected String host = "localhost";
+
+    protected int port = 2000;
+
+    private DataDecoder decoder;
+
+    protected ServerListener listener;
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public DataDecoder getDecoder() {
+        return decoder;
+    }
+
+    public void setDecoder(DataDecoder decoder) {
+        this.decoder = decoder;
+    }
+
+    public ServerListener getListener() {
+        return listener;
+    }
+
+    public void setListener(ServerListener listener) {
+        this.listener = listener;
+    }
+
+    protected void decode(byte[] data, int offset, int length, ServerContext context) {
+        if (decoder != null) {
+            decoder.decode(data, offset, length);
+            if (decoder.state() == DataDecoder.STATE_FINISHED) {
+                if (listener != null) {
+                    listener.onDataReceived(decoder.getResult(), context);
+                }
+
+                decoder.clear();
+            } else if (decoder.state() == DataDecoder.STATE_ERROR) {
+                logger.error("Decoder error. " + decoder.getLastError());
+                decoder.clear();
+            }
+        } else {
+            System.arraycopy(data, offset, data, 0, length);
+            if (listener != null) {
+                listener.onDataReceived(data, context);
+            }
+        }
+    }
+}