Mercurial > stress-tester
changeset 976:aff81768741e
FlowExecutorNetflowTest
author | Devel 2 |
---|---|
date | Thu, 25 Jul 2019 11:25:45 +0200 |
parents | 1d6458a1c1da |
children | 71d0f31b7e44 |
files | stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java |
diffstat | 2 files changed, 269 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/client/netflow/FlowExecutorNetflowTest.java Thu Jul 25 11:25:45 2019 +0200 @@ -0,0 +1,75 @@ +package com.passus.st.client.netflow; + +import com.passus.commons.service.ServiceUtils; +import com.passus.commons.utils.ArrayUtils; +import com.passus.net.netflow.Netflow9; +import com.passus.net.netflow.Netflow9Decoder; +import com.passus.st.Protocols; +import com.passus.st.client.Event; +import com.passus.st.client.FlowExecutor; +import com.passus.st.emitter.RuleBasedSessionMapper; +import com.passus.st.emitter.nio.NioEmitter; +import com.passus.st.utils.EventUtils; +import com.passus.st.utils.SimpleDatagramServer; +import com.passus.st.utils.SimpleDatagramServer.DefaultServerListener; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.testng.AssertJUnit.assertEquals; + +public class FlowExecutorNetflowTest { + + private NioEmitter prepareEmitter(String mapperRule) throws Exception { + RuleBasedSessionMapper sessionMapper = new RuleBasedSessionMapper(); + sessionMapper.addRule(mapperRule); + + NioEmitter emitter = new NioEmitter(); + emitter.setSessionMapper(sessionMapper); + return emitter; + } + + @Test + public void testHandle_Netflow() throws Exception { + DefaultServerListener<Netflow9> listener = new DefaultServerListener<>(); + SimpleDatagramServer server = new SimpleDatagramServer(); + server.setDecoder(new Netflow9Decoder()); + server.setListener(listener); + server.start(); + + Map<String, Object> props = new HashMap<>(); + props.put("protocols", ArrayUtils.asSet(Protocols.NETFLOW)); + List<Event> events = EventUtils.readEvents("pcap/netflow/netflow_v9_template_data.pcap", props); + + NioEmitter emitter = prepareEmitter("*:* -> localhost:2000"); + emitter.start(); + + FlowExecutor flowExecutor = new FlowExecutor(); + flowExecutor.setEmitter(emitter); + flowExecutor.setConnectPartialSession(true); + flowExecutor.setWorkerType("parallel"); + try { + flowExecutor.setListener((request, response, context) -> { + System.out.println(response); + }); + flowExecutor.start(); + events.forEach(flowExecutor::handle); + flowExecutor.join(); + + assertEquals(0, listener.getErrors()); + assertEquals(2, listener.getReceived().size()); + Iterator<Netflow9> it = listener.getReceived().iterator(); + assertEquals(92463920L, it.next().getFlowSequence()); + assertEquals(92463921L, it.next().getFlowSequence()); + } finally { + ServiceUtils.stopQuietly(flowExecutor); + ServiceUtils.stopQuietly(emitter); + ServiceUtils.stopQuietly(server); + } + + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java Thu Jul 25 11:25:45 2019 +0200 @@ -0,0 +1,194 @@ +package com.passus.st.utils; + +import com.passus.commons.service.Service; +import com.passus.commons.service.ServiceException; +import com.passus.data.DataDecoder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; + +public class SimpleDatagramServer implements Service { + + private static final Logger LOGGER = LogManager.getLogger(SimpleDatagramServer.class); + + private String host = "localhost"; + + private int port = 2000; + + private DataDecoder decoder; + + private boolean started; + + private ServerThread serverThread; + + private ServerListener listener; + + private DatagramSocket serverSocket; + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public DataDecoder getDecoder() { + return decoder; + } + + public void setDecoder(DataDecoder decoder) { + this.decoder = decoder; + } + + public ServerListener getListener() { + return listener; + } + + public void setListener(ServerListener listener) { + this.listener = listener; + } + + @Override + public boolean isStarted() { + return started; + } + + @Override + public void start() { + if (started) { + return; + } + + try { + DatagramSocket serverSocket = new DatagramSocket(port, InetAddress.getByName(host)); + LOGGER.debug("Datagram socket created."); + serverThread = new ServerThread(serverSocket); + serverThread.start(); + started = true; + } catch (IOException e) { + throw new ServiceException(e.getMessage(), e); + } + } + + @Override + public void stop() { + if (!started) { + return; + } + + serverThread.working = false; + serverSocket.disconnect(); + serverThread.interrupt(); + try { + serverThread.join(5_000); + } catch (InterruptedException ignore) { + + } + + started = false; + } + + private class ServerThread extends Thread { + + private final DatagramSocket serverSocket; + + private boolean working = false; + + public ServerThread(DatagramSocket serverSocket) { + this.serverSocket = serverSocket; + } + + @Override + public void run() { + working = true; + byte[] buffer = new byte[8048]; + while (working) { + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + try { + serverSocket.receive(packet); + if (packet.getLength() > 0) { + if (decoder != null) { + decoder.decode(packet.getData(), packet.getOffset(), packet.getLength()); + if (decoder.state() == DataDecoder.STATE_FINISHED) { + if (listener != null) { + listener.onDataReceived(decoder.getResult()); + } + + decoder.clear(); + } else if (decoder.state() == DataDecoder.STATE_ERROR) { + LOGGER.error("Decoder error. " + decoder.getLastError()); + decoder.clear(); + } + } else { + byte[] data = new byte[packet.getLength()]; + System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength()); + if (listener != null) { + listener.onDataReceived(data); + } + } + } + } catch (IOException e) { + if (listener != null) { + listener.onError(e); + } + + LOGGER.error(e.getMessage(), e); + } + } + } + } + + public static interface ServerListener<T> { + + void onDataReceived(T data); + + void onError(Exception ex); + } + + public static class DefaultServerListener<T> implements ServerListener<T> { + + private List<T> received = new ArrayList<>(); + + private int errors = 0; + + private Exception lastException; + + public List<T> getReceived() { + return received; + } + + public int getErrors() { + return errors; + } + + public Exception getLastException() { + return lastException; + } + + @Override + public void onDataReceived(T data) { + received.add(data); + } + + @Override + public void onError(Exception ex) { + errors++; + lastException = ex; + } + } +}