Mercurial > stress-tester
changeset 992:2e7e654f2ad5
SimpleDatagramServer refactorization
author | Devel 2 |
---|---|
date | Fri, 06 Sep 2019 09:41:12 +0200 |
parents | 1d5d4cfd88fb |
children | 604e4ef38dee |
files | stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java stress-tester/src/test/java/com/passus/st/utils/server/DatagramServerContext.java stress-tester/src/test/java/com/passus/st/utils/server/DefaultServerListener.java stress-tester/src/test/java/com/passus/st/utils/server/ServerContext.java stress-tester/src/test/java/com/passus/st/utils/server/ServerListener.java stress-tester/src/test/java/com/passus/st/utils/server/SimpleDatagramServer.java stress-tester/src/test/java/com/passus/st/utils/server/SimpleServer.java |
diffstat | 9 files changed, 319 insertions(+), 285 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java Thu Sep 05 11:36:59 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/dns/FlowExecutorDnsTest.java Fri Sep 06 09:41:12 2019 +0200 @@ -14,8 +14,9 @@ import com.passus.st.emitter.RuleBasedSessionMapper; import com.passus.st.emitter.nio.NioEmitter; import com.passus.st.utils.EventUtils; -import com.passus.st.utils.SimpleDatagramServer; -import com.passus.st.utils.SimpleDatagramServer.ServerContext; +import com.passus.st.utils.server.ServerListener; +import com.passus.st.utils.server.SimpleDatagramServer; +import com.passus.st.utils.server.ServerContext; import org.apache.commons.lang3.mutable.MutableObject; import org.testng.annotations.Test; @@ -42,7 +43,7 @@ public void testHandle_DNS() throws Exception { SimpleDatagramServer server = new SimpleDatagramServer(); server.setPort(2000); - server.setListener(new SimpleDatagramServer.ServerListener() { + server.setListener(new ServerListener() { @Override public void onDataReceived(Object data, ServerContext context) { Dns dns = new DnsBuilder().answerA("test.com", "1.1.1.1").build();
--- a/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java Thu Sep 05 11:36:59 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java Fri Sep 06 09:41:12 2019 +0200 @@ -10,8 +10,8 @@ import com.passus.st.emitter.RuleBasedSessionMapper; import com.passus.st.emitter.nio.NioEmitter; import com.passus.st.utils.EventUtils; -import com.passus.st.utils.SimpleDatagramServer; -import com.passus.st.utils.SimpleDatagramServer.DefaultServerListener; +import com.passus.st.utils.server.SimpleDatagramServer; +import com.passus.st.utils.server.DefaultServerListener; import org.testng.annotations.Test; import java.util.HashMap;
--- a/stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java Thu Sep 05 11:36:59 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,280 +0,0 @@ -package com.passus.st.utils; - -import com.passus.commons.service.Service; -import com.passus.commons.service.ServiceException; -import com.passus.data.ByteBuff; -import com.passus.data.DataDecoder; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.SocketException; -import java.util.ArrayList; -import java.util.List; - -public class SimpleDatagramServer implements Service { - - private static final Logger LOGGER = LogManager.getLogger(SimpleDatagramServer.class); - - private String host = "localhost"; - - private int port = 2000; - - private DataDecoder decoder; - - private boolean started; - - private ServerThread serverThread; - - private ServerListener listener; - - private DatagramSocket serverSocket; - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public DataDecoder getDecoder() { - return decoder; - } - - public void setDecoder(DataDecoder decoder) { - this.decoder = decoder; - } - - public ServerListener getListener() { - return listener; - } - - public void setListener(ServerListener listener) { - this.listener = listener; - } - - @Override - public boolean isStarted() { - return started; - } - - @Override - public void start() { - if (started) { - return; - } - - try { - serverSocket = new DatagramSocket(port, InetAddress.getByName(host)); - LOGGER.debug("Datagram socket created."); - serverThread = new ServerThread(serverSocket); - serverThread.start(); - started = true; - } catch (IOException e) { - throw new ServiceException(e.getMessage(), e); - } - } - - @Override - public void stop() { - if (!started) { - return; - } - - try { - serverSocket.close(); - } catch (Exception e) { - e.printStackTrace(); - } - - serverThread.working = false; - serverThread.interrupt(); - try { - serverThread.join(); - } catch (InterruptedException ignore) { - - } - - started = false; - } - - private class ServerThread extends Thread { - - private final DatagramSocket serverSocket; - - private boolean working = false; - - public ServerThread(DatagramSocket serverSocket) { - this.serverSocket = serverSocket; - } - - @Override - public void run() { - working = true; - byte[] buffer = new byte[8048]; - ServerContext context = new ServerContext(serverSocket); - while (working) { - DatagramPacket packet = new DatagramPacket(buffer, buffer.length); - try { - if (serverSocket.isClosed()) { - break; - } - - serverSocket.receive(packet); - context.clientAddress = packet.getAddress(); - context.clientPort = packet.getPort(); - if (packet.getLength() > 0) { - if (decoder != null) { - decoder.decode(packet.getData(), packet.getOffset(), packet.getLength()); - if (decoder.state() == DataDecoder.STATE_FINISHED) { - if (listener != null) { - listener.onDataReceived(decoder.getResult(), context); - } - - decoder.clear(); - } else if (decoder.state() == DataDecoder.STATE_ERROR) { - LOGGER.error("Decoder error. " + decoder.getLastError()); - decoder.clear(); - } - } else { - byte[] data = new byte[packet.getLength()]; - System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength()); - if (listener != null) { - listener.onDataReceived(data, context); - } - } - } - } catch (SocketException e) { - if (e.getMessage().contains("socket closed")) { - break; - } - - if (listener != null) { - listener.onError(e, context); - } - - LOGGER.error(e.getMessage(), e); - } catch (IOException e) { - if (listener != null) { - listener.onError(e, context); - } - - LOGGER.error(e.getMessage(), e); - } - } - - LOGGER.debug("Server thread stopped."); - } - } - - public static interface ServerListener<T> { - - default void onDataReceived(T data, ServerContext context) { - - } - - default void onError(Exception ex, ServerContext context) { - - } - } - - public static class ServerContext { - - private final DatagramSocket serverSocket; - - private InetAddress clientAddress; - - private int clientPort; - - public ServerContext(DatagramSocket serverSocket) { - this.serverSocket = serverSocket; - } - - public InetAddress getClientAddress() { - return clientAddress; - } - - public int getClientPort() { - return clientPort; - } - - public void write(ByteBuff buff) throws IOException { - write(buff.buffer(), buff.startIndex(), buff.length()); - } - - public void write(byte[] data, int offset, int length) throws IOException { - DatagramPacket packet = new DatagramPacket(data, offset, length, clientAddress, clientPort); - serverSocket.send(packet); - } - - } - - public static class DefaultServerListener<T> implements ServerListener<T> { - - private List<T> received = new ArrayList<>(); - - private long firstTimestamp = -1; - - private long lastTimestamp = -1; - - private int errors = 0; - - private Exception lastException; - - public List<T> getReceived() { - return received; - } - - public long getFirstTimestamp() { - return firstTimestamp; - } - - public long getLastTimestamp() { - return lastTimestamp; - } - - public int getErrors() { - return errors; - } - - public Exception getLastException() { - return lastException; - } - - public void clear() { - received.clear(); - firstTimestamp = -1; - lastTimestamp = -1; - errors = 0; - } - - @Override - public void onDataReceived(T data, ServerContext context) { - long now = System.currentTimeMillis(); - if (firstTimestamp == -1) { - firstTimestamp = now; - } - - received.add(data); - lastTimestamp = now; - } - - @Override - public void onError(Exception ex, ServerContext context) { - errors++; - lastException = ex; - } - } -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/utils/server/DatagramServerContext.java Fri Sep 06 09:41:12 2019 +0200 @@ -0,0 +1,38 @@ +package com.passus.st.utils.server; + +import com.passus.data.ByteBuff; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; + +public class DatagramServerContext implements ServerContext { + + private final DatagramSocket serverSocket; + + InetAddress clientAddress; + + int clientPort; + + public DatagramServerContext(DatagramSocket serverSocket) { + this.serverSocket = serverSocket; + } + + public InetAddress getClientAddress() { + return clientAddress; + } + + public int getClientPort() { + return clientPort; + } + + public void write(ByteBuff buff) throws IOException { + write(buff.buffer(), buff.startIndex(), buff.length()); + } + + public void write(byte[] data, int offset, int length) throws IOException { + DatagramPacket packet = new DatagramPacket(data, offset, length, clientAddress, clientPort); + serverSocket.send(packet); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/utils/server/DefaultServerListener.java Fri Sep 06 09:41:12 2019 +0200 @@ -0,0 +1,61 @@ +package com.passus.st.utils.server; + +import java.util.ArrayList; +import java.util.List; + +public class DefaultServerListener<T> implements ServerListener<T> { + + private List<T> received = new ArrayList<>(); + + private long firstTimestamp = -1; + + private long lastTimestamp = -1; + + private int errors = 0; + + private Exception lastException; + + public List<T> getReceived() { + return received; + } + + public long getFirstTimestamp() { + return firstTimestamp; + } + + public long getLastTimestamp() { + return lastTimestamp; + } + + public int getErrors() { + return errors; + } + + public Exception getLastException() { + return lastException; + } + + public void clear() { + received.clear(); + firstTimestamp = -1; + lastTimestamp = -1; + errors = 0; + } + + @Override + public void onDataReceived(T data, ServerContext context) { + long now = System.currentTimeMillis(); + if (firstTimestamp == -1) { + firstTimestamp = now; + } + + received.add(data); + lastTimestamp = now; + } + + @Override + public void onError(Exception ex, ServerContext context) { + errors++; + lastException = ex; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/utils/server/ServerContext.java Fri Sep 06 09:41:12 2019 +0200 @@ -0,0 +1,13 @@ +package com.passus.st.utils.server; + +import com.passus.data.ByteBuff; + +import java.io.IOException; + +public interface ServerContext { + + void write(ByteBuff buff) throws IOException; + + void write(byte[] data, int offset, int length) throws IOException; + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/utils/server/ServerListener.java Fri Sep 06 09:41:12 2019 +0200 @@ -0,0 +1,12 @@ +package com.passus.st.utils.server; + +public interface ServerListener<T> { + + default void onDataReceived(T data, ServerContext context) { + + } + + default void onError(Exception ex, ServerContext context) { + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/utils/server/SimpleDatagramServer.java Fri Sep 06 09:41:12 2019 +0200 @@ -0,0 +1,117 @@ +package com.passus.st.utils.server; + +import com.passus.commons.service.ServiceException; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.SocketException; + +public class SimpleDatagramServer extends SimpleServer { + + private boolean started; + + private ServerThread serverThread; + + private DatagramSocket serverSocket; + + @Override + public boolean isStarted() { + return started; + } + + @Override + public void start() { + if (started) { + return; + } + + try { + serverSocket = new DatagramSocket(port, InetAddress.getByName(host)); + logger.debug("Datagram socket created."); + serverThread = new ServerThread(serverSocket); + serverThread.start(); + started = true; + } catch (IOException e) { + throw new ServiceException(e.getMessage(), e); + } + } + + @Override + public void stop() { + if (!started) { + return; + } + + try { + serverSocket.close(); + } catch (Exception e) { + e.printStackTrace(); + } + + serverThread.working = false; + serverThread.interrupt(); + try { + serverThread.join(); + } catch (InterruptedException ignore) { + + } + + started = false; + } + + private class ServerThread extends Thread { + + private final DatagramSocket serverSocket; + + private boolean working = false; + + public ServerThread(DatagramSocket serverSocket) { + this.serverSocket = serverSocket; + } + + @Override + public void run() { + working = true; + byte[] buffer = new byte[8048]; + DatagramServerContext context = new DatagramServerContext(serverSocket); + while (working) { + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + try { + if (serverSocket.isClosed()) { + break; + } + + serverSocket.receive(packet); + context.clientAddress = packet.getAddress(); + context.clientPort = packet.getPort(); + if (packet.getLength() > 0) { + decode(packet.getData(), packet.getOffset(), packet.getLength(), context); + } + } catch (SocketException e) { + if (e.getMessage().contains("socket closed")) { + break; + } + + if (listener != null) { + listener.onError(e, context); + } + + logger.error(e.getMessage(), e); + } catch (IOException e) { + if (listener != null) { + listener.onError(e, context); + } + + logger.error(e.getMessage(), e); + } + } + + logger.debug("Server thread stopped."); + } + } + + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/utils/server/SimpleServer.java Fri Sep 06 09:41:12 2019 +0200 @@ -0,0 +1,72 @@ +package com.passus.st.utils.server; + +import com.passus.commons.service.Service; +import com.passus.data.DataDecoder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public abstract class SimpleServer implements Service { + + protected final Logger logger = LogManager.getLogger(getClass()); + + protected String host = "localhost"; + + protected int port = 2000; + + private DataDecoder decoder; + + protected ServerListener listener; + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public DataDecoder getDecoder() { + return decoder; + } + + public void setDecoder(DataDecoder decoder) { + this.decoder = decoder; + } + + public ServerListener getListener() { + return listener; + } + + public void setListener(ServerListener listener) { + this.listener = listener; + } + + protected void decode(byte[] data, int offset, int length, ServerContext context) { + if (decoder != null) { + decoder.decode(data, offset, length); + if (decoder.state() == DataDecoder.STATE_FINISHED) { + if (listener != null) { + listener.onDataReceived(decoder.getResult(), context); + } + + decoder.clear(); + } else if (decoder.state() == DataDecoder.STATE_ERROR) { + logger.error("Decoder error. " + decoder.getLastError()); + decoder.clear(); + } + } else { + System.arraycopy(data, offset, data, 0, length); + if (listener != null) { + listener.onDataReceived(data, context); + } + } + } +}