changeset 967:57e0d4394116

NioDatagramChannelContext in progress
author Devel 2
date Mon, 08 Jul 2019 10:54:22 +0200
parents 74eb9b0e8b37
children 0de7ca4925a3
files stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/EmitterHandler.java stress-tester/src/main/java/com/passus/st/emitter/SessionMapper.java stress-tester/src/main/java/com/passus/st/emitter/SocketMatcher.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioDefaultEmitterWorker.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioSocketChannelContext.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioSocketEmitterWorker.java stress-tester/src/main/java/com/passus/st/job/JobExecutor.java stress-tester/src/test/java/com/passus/st/client/http/ReporterConfiguratorTest.java
diffstat 14 files changed, 764 insertions(+), 673 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java	Fri May 31 13:25:02 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java	Mon Jul 08 10:54:22 2019 +0200
@@ -2,41 +2,40 @@
 
 import com.passus.data.ByteBuff;
 import com.passus.net.SocketAddress;
-import com.passus.st.emitter.SessionInfo;
+
 import java.io.IOException;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public interface ChannelContext {
 
-    public boolean isConnected();
-
-    public boolean isConnectionPending();
+    boolean isConnected();
 
-    public void write(byte[] data, int offset, int length) throws IOException;
+    boolean isConnectionPending();
 
-    public void write(ByteBuff data) throws IOException;
+    void write(byte[] data, int offset, int length) throws IOException;
 
-    public default void writeAndFlush(byte[] data, int offset, int length) throws IOException {
+    void write(ByteBuff data) throws IOException;
+
+    default void writeAndFlush(byte[] data, int offset, int length) throws IOException {
         write(data, offset, length);
         flush();
     }
 
-    public default void writeAndFlush(ByteBuff data) throws IOException {
+    default void writeAndFlush(ByteBuff data) throws IOException {
         write(data);
         flush();
     }
 
-    public void flush() throws IOException;
-
-    public void close() throws IOException;
+    void flush() throws IOException;
 
-    public SocketAddress getLocalAddress();
+    void close() throws IOException;
 
-    public SocketAddress getRemoteAddress();
+    SocketAddress getLocalAddress();
 
-    public SessionInfo getSessionInfo();
+    SocketAddress getRemoteAddress();
+
+    SessionInfo getSessionInfo();
 
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/EmitterHandler.java	Fri May 31 13:25:02 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/EmitterHandler.java	Mon Jul 08 10:54:22 2019 +0200
@@ -8,38 +8,30 @@
 public interface EmitterHandler {
 
     default void channelRegistered(ChannelContext context) throws Exception {
-
     }
 
     default void channelUnregistered(ChannelContext context) throws Exception {
-
     }
 
     default void channelActive(ChannelContext context) throws Exception {
-
     }
 
     default void channelInactive(ChannelContext context) throws Exception {
-
     }
 
     default void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
     }
 
     default void dataWriteStart(ChannelContext context) {
-
     }
 
     default void dataWritten(ChannelContext context) throws Exception {
-
     }
 
     default void sessionInvalidated(SessionInfo session) throws Exception {
-
     }
 
     default void errorOccurred(ChannelContext context, Throwable cause) throws Exception {
-
     }
 
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionMapper.java	Fri May 31 13:25:02 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionMapper.java	Mon Jul 08 10:54:22 2019 +0200
@@ -5,18 +5,17 @@
 import com.passus.net.SocketAddress;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public interface SessionMapper {
 
-    public static final SocketAddress ANY_SOCKET = new SocketAddress(Ip4Address.EMPTY_ADDRESS, 0);
+    SocketAddress ANY_SOCKET = new SocketAddress(Ip4Address.EMPTY_ADDRESS, 0);
 
-    public boolean invalidate(SessionInfo sessionInfo);
+    boolean invalidate(SessionInfo sessionInfo);
 
-    public ConnectionParams map(SessionInfo sessionInfo);
+    ConnectionParams map(SessionInfo sessionInfo);
 
-    public static class ConnectionParams {
+    class ConnectionParams {
 
         private final SocketAddress remoteAddress;
 
--- a/stress-tester/src/main/java/com/passus/st/emitter/SocketMatcher.java	Fri May 31 13:25:02 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/SocketMatcher.java	Mon Jul 08 10:54:22 2019 +0200
@@ -8,6 +8,6 @@
  */
 interface SocketMatcher {
 
-    public boolean match(IpAddress ip, int port);
+    boolean match(IpAddress ip, int port);
     
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Mon Jul 08 10:54:22 2019 +0200
@@ -0,0 +1,478 @@
+package com.passus.st.emitter.nio;
+
+import com.passus.data.ByteBuff;
+import com.passus.data.HeapByteBuff;
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.emitter.SessionMapper.ConnectionParams;
+import com.passus.st.metric.MetricsContainer;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID;
+
+public abstract class NioAbstractEmitterWorker extends NioEmitterWorker {
+
+    protected final Selector selector;
+
+    protected final Queue<Task> tasks = new ConcurrentLinkedQueue<>();
+
+    private volatile boolean working = false;
+
+    public NioAbstractEmitterWorker(int index) throws IOException {
+        super(index);
+        selector = Selector.open();
+    }
+
+    @Override
+    public void setWorking(boolean working) {
+        this.working = working;
+        if (this.working && !working) {
+            try {
+                selector.close();
+            } catch (IOException ex) {
+                logger.warn(ex.getMessage(), ex);
+            }
+        }
+    }
+
+    public boolean isWorking() {
+        return working;
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+        if (collectMetrics) {
+            container.update(System.currentTimeMillis(), metric);
+            metric.reset();
+        }
+    }
+
+    @Override
+    protected void flush(SelectionKey key) {
+        tasks.add(new FlushTask(key));
+        selector.wakeup();
+    }
+
+    @Override
+    public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException {
+        tasks.add(new ConnectTask(sessionInfo, handler));
+        selector.wakeup();
+    }
+
+    protected ConnectionParams getConnParams(SessionInfo sessionInfo, EmitterHandler handler) {
+        ConnectionParams connParams = sessionMapper.map(sessionInfo);
+        if (connParams == null) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Unable to map session '{}'.", sessionInfo);
+            }
+
+            if (collectMetrics) {
+                synchronized (metric) {
+                    metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID);
+                }
+            }
+
+            try {
+                handler.sessionInvalidated(sessionInfo);
+            } catch (Exception e) {
+                logger.debug(e.getMessage(), e);
+            }
+
+            return null;
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
+        }
+
+        return connParams;
+    }
+
+    protected abstract void doConnect(SessionInfo sessionInfo, EmitterHandler handler);
+
+    protected void doFinishConnect(SelectionKey key) {
+        KeyContext keyContext = (KeyContext) key.attachment();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Connecting to '" + keyContext.channelContext.getRemoteAddress() + "'.");
+        }
+
+        try {
+            NioChannelContext channelContext = keyContext.channelContext;
+            long connStart = System.currentTimeMillis();
+            boolean timeouted = false;
+            while (!channelContext.finishConnect()) {
+                long now = System.currentTimeMillis();
+                if (now - connStart < connectionTimeout) {
+                    timeouted = true;
+                    break;
+                }
+            }
+            key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
+
+            if (timeouted) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress());
+                }
+
+                throw new ConnectException("Connection timed out.");
+
+            }
+        } catch (Exception e) {
+            doCatchException(key, e);
+            key.cancel();
+            if (collectMetrics) {
+                metric.incConnectionsErrors();
+            }
+
+            return;
+        }
+
+        try {
+            if (collectMetrics) {
+                metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress());
+                metric.addBindSocket(keyContext.channelContext.getLocalAddress());
+            }
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress());
+            }
+
+            keyContext.handler.channelActive(keyContext.channelContext);
+            setOpRead(key);
+        } catch (Exception ex) {
+            logger.error(ex.getMessage(), ex);
+        }
+    }
+
+    protected void doClose(SelectionKey key) {
+        if (!key.channel().isOpen()) {
+            selector.wakeup();
+            return;
+        }
+
+        try {
+            key.channel().close();
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+
+        KeyContext keyContext = (KeyContext) key.attachment();
+        if (logger.isDebugEnabled()) {
+            logger.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'.");
+        }
+
+        try {
+            keyContext.handler.channelInactive(keyContext.channelContext);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+
+        key.cancel();
+        try {
+            keyContext.handler.channelUnregistered(keyContext.channelContext);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'.");
+        }
+
+        if (collectMetrics) {
+            metric.incClosedConnections();
+        }
+
+        selector.wakeup();
+    }
+
+    protected void doCatchException(SelectionKey key, Throwable cause) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Error occurred. " + cause.getMessage(), cause);
+        }
+
+        if (collectMetrics) {
+            synchronized (metric) {
+                metric.errorCaught(cause);
+            }
+        }
+
+        KeyContext keyContext = (KeyContext) key.attachment();
+
+        try {
+            keyContext.handler.errorOccurred(keyContext.channelContext, cause);
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+    }
+
+    protected void setOpRead(SelectionKey key) {
+        if (!key.isValid() || key.isReadable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+        key.selector().wakeup();
+    }
+
+    protected void clearOpRead(SelectionKey key) {
+        if (!key.isValid() || !key.isReadable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
+        key.selector().wakeup();
+    }
+
+    protected void setOpWrite(SelectionKey key) {
+        if (!key.isValid() || key.isWritable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+        key.selector().wakeup();
+    }
+
+    protected void clearOpWrite(SelectionKey key) {
+        if (!key.isValid() || !key.isWritable()) {
+            return;
+        }
+
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+        key.selector().wakeup();
+    }
+
+    @Override
+    protected void requestClose(SelectionKey key) {
+        tasks.add(new CloseTask(key));
+        key.selector().wakeup();
+    }
+
+    protected void doWrite(SelectionKey key) {
+        KeyContext keyContext = (KeyContext) key.attachment();
+        if (logger.isDebugEnabled()) {
+            logger.debug("Writing ({} -> {}).",
+                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
+        }
+
+        NioChannelContext channelContext = keyContext.channelContext;
+        keyContext.handler.dataWriteStart(channelContext);
+        Queue<ByteBuffer> queue = channelContext.dataQueue();
+        int written = 0;
+        try {
+            ByteBuffer buffer;
+            while (!queue.isEmpty()) {
+                buffer = queue.poll();
+                while (buffer.hasRemaining()) {
+                    int res = channelContext.write0(buffer);
+
+                    if (res == -1) {
+                        doClose(key);
+                        return;
+                    }
+
+                    if (collectMetrics) {
+                        metric.updateSentBytes(res);
+                    }
+
+                    written += res;
+                }
+            }
+        } catch (Exception e) {
+            doCatchException(key, e);
+            doClose(key);
+            return;
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Written {}B ({} -> {})", written,
+                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
+        }
+
+        //TODO Operacje na handlerach powinny przechodzic przez Executor
+        try {
+            keyContext.handler.dataWritten(keyContext.channelContext);
+            logger.debug("Write handled.");
+        } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+        }
+
+        setOpRead(key);
+        clearOpWrite(key);
+    }
+
+    protected void doRead(SelectionKey key) {
+        KeyContext keyContext = (KeyContext) key.attachment();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Reading ({} -> {})",
+                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
+        }
+
+        ByteBuffer buffer = keyContext.buffer;
+
+        buffer.clear();
+
+        NioChannelContext channelContext = keyContext.channelContext;
+        ByteBuff buff = new HeapByteBuff();
+        int totalReaded = 0;
+        int readed;
+        try {
+            while ((readed = channelContext.read0(buffer)) > 0) {
+                buffer.flip();
+                buff.append(buffer.array(), buffer.position(), buffer.limit());
+                buffer.clear();
+                totalReaded += readed;
+
+                if (collectMetrics) {
+                    metric.updateReceivedBytes(readed);
+                }
+            }
+        } catch (IOException e) {
+            doCatchException(key, e);
+            doClose(key);
+            return;
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Readed {}B ({} -> {})", totalReaded,
+                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
+        }
+
+        if (totalReaded > 0) {
+            try {
+                keyContext.handler.dataReceived(keyContext.channelContext, buff);
+                logger.debug("Read handled.");
+            } catch (Exception e) {
+                logger.debug(e.getMessage(), e);
+            }
+        }
+
+        if (readed == -1) {
+            doClose(key);
+            return;
+        }
+
+        keyContext.buffer.flip();
+    }
+
+    @Override
+    public void run() {
+        int selected = 0;
+        working = true;
+        while (working) {
+            if (!tasks.isEmpty()) {
+                Task task;
+                while ((task = tasks.poll()) != null) {
+                    if (task.code == Task.CLOSE) {
+                        doClose(((CloseTask) task).key);
+                    } else if (task.code == Task.CONNECT) {
+                        ConnectTask taskConn = (ConnectTask) task;
+                        doConnect(taskConn.sessionInfo, taskConn.handler);
+                    } else if (task.code == Task.FLUSH) {
+                        FlushTask flushTask = (FlushTask) task;
+                        setOpWrite(flushTask.key);
+                    }
+                }
+            }
+
+            try {
+                selected = selector.select(selectTimeout);
+            } catch (IOException ex) {
+                logger.warn(ex.getMessage(), ex);
+            }
+
+            if (selected > 0) {
+                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
+                while (it.hasNext()) {
+                    SelectionKey key = it.next();
+                    it.remove();
+
+                    if (!key.isValid()) {
+                        continue;
+                    }
+
+                    if (key.isConnectable()) {
+                        doFinishConnect(key);
+                    } else if (key.isWritable()) {
+                        doWrite(key);
+                    } else if (key.isReadable()) {
+                        doRead(key);
+                    }
+                }
+            }
+        }
+    }
+
+    protected static class KeyContext {
+
+        final EmitterHandler handler;
+
+        final NioChannelContext channelContext;
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+
+        public KeyContext(NioChannelContext channelContext, EmitterHandler handler) {
+            this.channelContext = channelContext;
+            this.handler = handler;
+        }
+
+    }
+
+    protected static abstract class Task {
+
+        public static final int CLOSE = 1;
+        public static final int CONNECT = 2;
+        public static final int FLUSH = 3;
+
+        final int code;
+
+        public Task(int code) {
+            this.code = code;
+        }
+
+    }
+
+    protected final static class ConnectTask extends Task {
+
+        final SessionInfo sessionInfo;
+        final EmitterHandler handler;
+
+        public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) {
+            super(CONNECT);
+            this.sessionInfo = sessionInfo;
+            this.handler = handler;
+        }
+
+    }
+
+    protected final static class CloseTask extends Task {
+
+        final SelectionKey key;
+
+        public CloseTask(SelectionKey key) {
+            super(CLOSE);
+            this.key = key;
+        }
+
+    }
+
+    protected final static class FlushTask extends Task {
+
+        final SelectionKey key;
+
+        public FlushTask(SelectionKey key) {
+            super(FLUSH);
+            this.key = key;
+        }
+
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java	Fri May 31 13:25:02 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,108 +0,0 @@
-package com.passus.st.emitter.nio;
-
-import com.passus.data.ByteBuff;
-import com.passus.net.SocketAddress;
-import com.passus.net.utils.AddressUtils;
-import com.passus.st.emitter.ChannelContext;
-import com.passus.st.emitter.SessionInfo;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.LinkedList;
-import java.util.Queue;
-
-/**
- *
- * @author Mirosław Hawrot
- */
-public class NioChannelContext implements ChannelContext {
-
-    private final NioEmitterWorker worker;
-
-    private final SessionInfo sessionInfo;
-
-    private final SocketChannel channel;
-
-    private final Queue<ByteBuffer> dataQueue;
-
-    private SocketAddress localAddress;
-
-    private SocketAddress remoteAddress;
-
-    private SelectionKey key;
-
-    public NioChannelContext(NioEmitterWorker worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) {
-        this.worker = worker;
-        this.channel = channel;
-        this.remoteAddress = remoteAddress;
-        this.sessionInfo = sessionInfo;
-        this.dataQueue = new LinkedList<>();
-
-    }
-
-    Queue<ByteBuffer> dataQueue() {
-        return dataQueue;
-    }
-
-    public void selectionKey(SelectionKey key) {
-        this.key = key;
-    }
-
-    private void addToQeueu(ByteBuffer buffer) throws IOException {
-        dataQueue.add(buffer);
-    }
-
-    @Override
-    public boolean isConnected() {
-        return channel.isConnected();
-    }
-
-    @Override
-    public boolean isConnectionPending() {
-        return channel.isConnectionPending();
-    }
-
-    @Override
-    public void write(byte[] data, int offset, int length) throws IOException {
-        addToQeueu(ByteBuffer.wrap(data, offset, length));
-    }
-
-    @Override
-    public void write(ByteBuff data) throws IOException {
-        addToQeueu(data.toNioByteBuffer());
-    }
-
-    @Override
-    public void flush() {
-        worker.flush(key);
-    }
-
-    @Override
-    public void close() throws IOException {
-        worker.requestClose(key);
-    }
-
-    @Override
-    public SocketAddress getLocalAddress() {
-        try {
-            if (localAddress == null && channel.getLocalAddress() != null) {
-                localAddress = AddressUtils.jdkSocketToSocketAddress(channel.getLocalAddress());
-            }
-        } catch (Exception e) {
-        }
-
-        return localAddress;
-    }
-
-    @Override
-    public SocketAddress getRemoteAddress() {
-        return remoteAddress;
-    }
-
-    @Override
-    public SessionInfo getSessionInfo() {
-        return sessionInfo;
-    }
-
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramChannelContext.java	Mon Jul 08 10:54:22 2019 +0200
@@ -0,0 +1,54 @@
+package com.passus.st.emitter.nio;
+
+import com.passus.net.SocketAddress;
+import com.passus.net.utils.AddressUtils;
+import com.passus.st.emitter.SessionInfo;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+public class NioDatagramChannelContext extends NioChannelContext<DatagramChannel> {
+
+    public NioDatagramChannelContext(NioEmitterWorker worker, DatagramChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) {
+        super(worker, channel, remoteAddress, sessionInfo);
+    }
+
+    @Override
+    boolean finishConnect() throws IOException {
+        return true;
+    }
+
+    @Override
+    int write0(ByteBuffer buffer) throws IOException {
+        return channel.write(buffer);
+    }
+
+    @Override
+    int read0(ByteBuffer buffer) throws IOException {
+        return channel.read(buffer);
+    }
+
+    @Override
+    public boolean isConnected() {
+        return channel.isConnected();
+    }
+
+    @Override
+    public boolean isConnectionPending() {
+        return false;
+    }
+
+    @Override
+    public SocketAddress getLocalAddress() {
+        try {
+            if (localAddress == null && channel.getLocalAddress() != null) {
+                localAddress = AddressUtils.jdkSocketToSocketAddress(channel.getLocalAddress());
+            }
+        } catch (Exception e) {
+        }
+
+        return localAddress;
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramEmitterWorker.java	Mon Jul 08 10:54:22 2019 +0200
@@ -0,0 +1,73 @@
+package com.passus.st.emitter.nio;
+
+import com.passus.net.SocketAddress;
+import com.passus.net.utils.AddressUtils;
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.emitter.SessionMapper;
+
+import java.io.IOException;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+import static com.passus.st.emitter.SessionMapper.ANY_SOCKET;
+import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID;
+
+public class NioDatagramEmitterWorker extends NioAbstractEmitterWorker {
+
+    public NioDatagramEmitterWorker(int index) throws IOException {
+        super(index);
+    }
+
+    @Override
+    protected void doConnect(SessionInfo sessionInfo, EmitterHandler handler) {
+        try {
+            SessionMapper.ConnectionParams connParams = getConnParams(sessionInfo, handler);
+            if (connParams == null) {
+                return;
+            }
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
+            }
+
+            DatagramChannel channel = DatagramChannel.open();
+            channel.configureBlocking(false);
+
+            SocketAddress bindAddress = connParams.getBindAddress();
+            if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
+                channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress));
+            }
+
+            SocketAddress remoteAddress = connParams.getRemoteAddress();
+            if (remoteAddress == null) {
+                remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
+            }
+
+            NioDatagramChannelContext channelContext = new NioDatagramChannelContext(this, channel, remoteAddress, sessionInfo);
+            KeyContext keyContext = new KeyContext(channelContext, handler);
+            SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext);
+            try {
+                handler.channelRegistered(channelContext);
+            } catch (Exception ex) {
+                doCatchException(key, ex);
+            }
+
+            channelContext.selectionKey(key);
+            try {
+                channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress));
+            } catch (Exception ex) {
+                doCatchException(key, ex);
+                return;
+            }
+
+            selector.wakeup();
+        } catch (Exception e) {
+            if (collectMetrics) {
+                metric.errorCaught(e);
+            }
+            logger.error(e.getMessage(), e);
+        }
+
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDefaultEmitterWorker.java	Fri May 31 13:25:02 2019 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,520 +0,0 @@
-package com.passus.st.emitter.nio;
-
-import com.passus.data.ByteBuff;
-import com.passus.data.HeapByteBuff;
-import com.passus.net.SocketAddress;
-import com.passus.net.utils.AddressUtils;
-import com.passus.st.emitter.EmitterHandler;
-import static com.passus.st.emitter.SessionMapper.ANY_SOCKET;
-import com.passus.st.emitter.SessionMapper.ConnectionParams;
-import com.passus.st.emitter.SessionInfo;
-import com.passus.st.metric.MetricsContainer;
-import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- *
- * @author Mirosław Hawrot
- */
-public class NioDefaultEmitterWorker extends NioEmitterWorker {
-
-    private final Selector selector;
-
-    private volatile boolean working = false;
-
-    private final Queue<Task> tasks = new ConcurrentLinkedQueue<>();
-
-    public NioDefaultEmitterWorker(int index) throws IOException {
-        super(index);
-        selector = Selector.open();
-    }
-
-    @Override
-    public void setWorking(boolean working) {
-        this.working = working;
-        if (this.working && !working) {
-            try {
-                selector.close();
-            } catch (IOException ex) {
-                logger.warn(ex.getMessage(), ex);
-            }
-        }
-    }
-
-    public boolean isWorking() {
-        return working;
-    }
-
-    @Override
-    public void writeMetrics(MetricsContainer container) {
-        if (collectMetrics) {
-            container.update(System.currentTimeMillis(), metric);
-            metric.reset();
-        }
-    }
-
-    @Override
-    public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException {
-        tasks.add(new ConnectTask(sessionInfo, handler));
-        selector.wakeup();
-    }
-
-    @Override
-    protected void flush(SelectionKey key) {
-        tasks.add(new FlushTask(key));
-        selector.wakeup();
-    }
-
-    private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) {
-        try {
-            ConnectionParams connParams = sessionMapper.map(sessionInfo);
-            if (connParams == null) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Unable to map session '{}'.", sessionInfo);
-                }
-
-                if (collectMetrics) {
-                    synchronized (metric) {
-                        metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID);
-                    }
-                }
-
-                try {
-                    handler.sessionInvalidated(sessionInfo);
-                } catch (Exception e) {
-                    logger.debug(e.getMessage(), e);
-                }
-
-                return;
-            }
-
-            if (logger.isDebugEnabled()) {
-                logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams);
-            }
-
-            SocketChannel channel = SocketChannel.open();
-            channel.configureBlocking(false);
-
-            SocketAddress bindAddress = connParams.getBindAddress();
-            if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
-                channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress));
-            }
-
-            SocketAddress remoteAddress = connParams.getRemoteAddress();
-            if (remoteAddress == null) {
-                remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
-            }
-
-            NioChannelContext channelContext = new NioChannelContext(this, channel, remoteAddress, sessionInfo);
-            KeyContext keyContext = new KeyContext(channelContext, handler);
-            SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext);
-            try {
-                handler.channelRegistered(channelContext);
-            } catch (Exception ex) {
-                doCatchException(key, ex);
-            }
-
-            channelContext.selectionKey(key);
-            try {
-                channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress));
-            } catch (Exception ex) {
-                doCatchException(key, ex);
-                return;
-            }
-
-            selector.wakeup();
-        } catch (Exception e) {
-            if (collectMetrics) {
-                metric.errorCaught(e);
-            }
-            logger.error(e.getMessage(), e);
-        }
-
-    }
-
-    private void doFinishConnect(SelectionKey key) {
-        SocketChannel channel = (SocketChannel) key.channel();
-        KeyContext keyContext = (KeyContext) key.attachment();
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Connecting to '" + keyContext.channelContext.getRemoteAddress() + "'.");
-        }
-
-        try {
-            long connStart = System.currentTimeMillis();
-            boolean timeouted = false;
-            while (!channel.finishConnect()) {
-                long now = System.currentTimeMillis();
-                if (now - connStart < connectionTimeout) {
-                    timeouted = true;
-                    break;
-                }
-            }
-            key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
-
-            if (timeouted) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress());
-                }
-
-                throw new ConnectException("Connection timed out.");
-
-            }
-        } catch (Exception e) {
-            doCatchException(key, e);
-            key.cancel();
-            if (collectMetrics) {
-                metric.incConnectionsErrors();
-            }
-
-            return;
-        }
-
-        try {
-            if (collectMetrics) {
-                metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress());
-                metric.addBindSocket(keyContext.channelContext.getLocalAddress());
-            }
-
-            if (logger.isDebugEnabled()) {
-                logger.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress());
-            }
-
-            keyContext.handler.channelActive(keyContext.channelContext);
-            setOpRead(key);
-        } catch (Exception ex) {
-            logger.error(ex.getMessage(), ex);
-        }
-    }
-
-    private void doWrite(SelectionKey key) {
-        SocketChannel socketChannel = (SocketChannel) key.channel();
-        KeyContext keyContext = (KeyContext) key.attachment();
-        if (logger.isDebugEnabled()) {
-            logger.debug("Writing ({} -> {}).",
-                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
-        }
-
-        keyContext.handler.dataWriteStart(keyContext.channelContext);
-        Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue();
-        int written = 0;
-        try {
-            ByteBuffer buffer;
-            while (!queue.isEmpty()) {
-                buffer = queue.poll();
-                while (buffer.hasRemaining()) {
-                    int res = socketChannel.write(buffer);
-
-                    if (res == -1) {
-                        doClose(key);
-                        return;
-                    }
-
-                    if (collectMetrics) {
-                        metric.updateSentBytes(res);
-                    }
-
-                    written += res;
-                }
-            }
-        } catch (Exception e) {
-            doCatchException(key, e);
-            doClose(key);
-            return;
-        }
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Written {}B ({} -> {})", written,
-                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
-        }
-
-        //TODO Operacje na handlerach powinny przechodzic przez Executor
-        try {
-            keyContext.handler.dataWritten(keyContext.channelContext);
-            logger.debug("Write handled.");
-        } catch (Exception e) {
-            logger.debug(e.getMessage(), e);
-        }
-
-        setOpRead(key);
-        clearOpWrite(key);
-    }
-
-    private void doRead(SelectionKey key) {
-        SocketChannel channel = (SocketChannel) key.channel();
-        KeyContext keyContext = (KeyContext) key.attachment();
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Reading ({} -> {})",
-                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
-        }
-
-        ByteBuffer buffer = keyContext.buffer;
-
-        buffer.clear();
-
-        ByteBuff buff = new HeapByteBuff();
-        int totalReaded = 0;
-        int readed;
-        try {
-            while ((readed = channel.read(buffer)) > 0) {
-                buffer.flip();
-                buff.append(buffer.array(), buffer.position(), buffer.limit());
-                buffer.clear();
-                totalReaded += readed;
-
-                if (collectMetrics) {
-                    metric.updateReceivedBytes(readed);
-                }
-            }
-        } catch (IOException e) {
-            doCatchException(key, e);
-            doClose(key);
-            return;
-        }
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Readed {}B ({} -> {})", totalReaded,
-                    keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
-        }
-
-        if (totalReaded > 0) {
-            try {
-                keyContext.handler.dataReceived(keyContext.channelContext, buff);
-                logger.debug("Read handled.");
-            } catch (Exception e) {
-                logger.debug(e.getMessage(), e);
-            }
-        }
-
-        if (readed == -1) {
-            doClose(key);
-            return;
-        }
-
-        keyContext.buffer.flip();
-    }
-
-    private void doCatchException(SelectionKey key, Throwable cause) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Error occured. " + cause.getMessage(), cause);
-        }
-
-        if (collectMetrics) {
-            synchronized (metric) {
-                metric.errorCaught(cause);
-            }
-        }
-
-        KeyContext keyContext = (KeyContext) key.attachment();
-
-        try {
-            keyContext.handler.errorOccurred(keyContext.channelContext, cause);
-        } catch (Exception e) {
-            logger.debug(e.getMessage(), e);
-        }
-    }
-
-    private void doClose(SelectionKey key) {
-        if (!key.channel().isOpen()) {
-            selector.wakeup();
-            return;
-        }
-
-        try {
-            key.channel().close();
-        } catch (Exception e) {
-            logger.debug(e.getMessage(), e);
-        }
-
-        KeyContext keyContext = (KeyContext) key.attachment();
-        if (logger.isDebugEnabled()) {
-            logger.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'.");
-        }
-
-        try {
-            keyContext.handler.channelInactive(keyContext.channelContext);
-        } catch (Exception e) {
-            logger.debug(e.getMessage(), e);
-        }
-
-        key.cancel();
-        try {
-            keyContext.handler.channelUnregistered(keyContext.channelContext);
-        } catch (Exception e) {
-            logger.debug(e.getMessage(), e);
-        }
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'.");
-        }
-
-        if (collectMetrics) {
-            metric.incClosedConnections();
-        }
-
-        selector.wakeup();
-    }
-
-    static void setOpRead(SelectionKey key) {
-        if (!key.isValid() || key.isReadable()) {
-            return;
-        }
-
-        key.interestOps(key.interestOps() | SelectionKey.OP_READ);
-        key.selector().wakeup();
-    }
-
-    static void clearOpRead(SelectionKey key) {
-        if (!key.isValid() || !key.isReadable()) {
-            return;
-        }
-
-        key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
-        key.selector().wakeup();
-    }
-
-    static void setOpWrite(SelectionKey key) {
-        if (!key.isValid() || key.isWritable()) {
-            return;
-        }
-
-        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-        key.selector().wakeup();
-    }
-
-    static void clearOpWrite(SelectionKey key) {
-        if (!key.isValid() || !key.isWritable()) {
-            return;
-        }
-
-        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-        key.selector().wakeup();
-    }
-
-    @Override
-    protected void requestClose(SelectionKey key) {
-        tasks.add(new CloseTask(key));
-        key.selector().wakeup();
-    }
-
-    @Override
-    public void run() {
-        int selected = 0;
-        working = true;
-        while (working) {
-            if (!tasks.isEmpty()) {
-                Task task;
-                while ((task = tasks.poll()) != null) {
-                    if (task.code == Task.CLOSE) {
-                        doClose(((CloseTask) task).key);
-                    } else if (task.code == Task.CONNECT) {
-                        ConnectTask taskConn = (ConnectTask) task;
-                        doConnect(taskConn.sessionInfo, taskConn.handler);
-                    } else if (task.code == Task.FLUSH) {
-                        FlushTask flushTask = (FlushTask) task;
-                        setOpWrite(flushTask.key);
-                    }
-                }
-            }
-
-            try {
-                selected = selector.select(selectTimeout);
-            } catch (IOException ex) {
-                logger.warn(ex.getMessage(), ex);
-            }
-
-            if (selected > 0) {
-                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
-                while (it.hasNext()) {
-                    SelectionKey key = it.next();
-                    it.remove();
-
-                    if (!key.isValid()) {
-                        continue;
-                    }
-
-                    if (key.isConnectable()) {
-                        doFinishConnect(key);
-                    } else if (key.isWritable()) {
-                        doWrite(key);
-                    } else if (key.isReadable()) {
-                        doRead(key);
-                    }
-                }
-            }
-        }
-    }
-
-    private static class KeyContext {
-
-        private final EmitterHandler handler;
-
-        private final NioChannelContext channelContext;
-
-        private ByteBuffer buffer = ByteBuffer.allocate(1024);
-
-        public KeyContext(NioChannelContext channelContext, EmitterHandler handler) {
-            this.channelContext = channelContext;
-            this.handler = handler;
-        }
-
-    }
-
-    private static abstract class Task {
-
-        public static final int CLOSE = 1;
-        public static final int CONNECT = 2;
-        public static final int FLUSH = 3;
-
-        private final int code;
-
-        public Task(int code) {
-            this.code = code;
-        }
-
-    }
-
-    private final static class ConnectTask extends Task {
-
-        private final SessionInfo sessionInfo;
-        private final EmitterHandler handler;
-
-        public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) {
-            super(CONNECT);
-            this.sessionInfo = sessionInfo;
-            this.handler = handler;
-        }
-
-    }
-
-    private final static class CloseTask extends Task {
-
-        private final SelectionKey key;
-
-        public CloseTask(SelectionKey key) {
-            super(CLOSE);
-            this.key = key;
-        }
-
-    }
-
-    private final static class FlushTask extends Task {
-
-        private final SelectionKey key;
-
-        public FlushTask(SelectionKey key) {
-            super(FLUSH);
-            this.key = key;
-        }
-
-    }
-}
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java	Fri May 31 13:25:02 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java	Mon Jul 08 10:54:22 2019 +0200
@@ -58,7 +58,7 @@
     private final Class<? extends NioEmitterWorker> workerClass;
 
     public NioEmitter() {
-        this(NioDefaultEmitterWorker.class);
+        this(NioSocketEmitterWorker.class);
     }
 
     public NioEmitter(Class<? extends NioEmitterWorker> workerClass) {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioSocketChannelContext.java	Mon Jul 08 10:54:22 2019 +0200
@@ -0,0 +1,58 @@
+package com.passus.st.emitter.nio;
+
+import com.passus.net.SocketAddress;
+import com.passus.net.utils.AddressUtils;
+import com.passus.st.emitter.SessionInfo;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+/**
+ * @author Mirosław Hawrot
+ */
+public class NioSocketChannelContext extends NioChannelContext<SocketChannel> {
+
+    public NioSocketChannelContext(NioEmitterWorker worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) {
+        super(worker, channel, remoteAddress, sessionInfo);
+    }
+
+    @Override
+    int write0(ByteBuffer buffer) throws IOException {
+        return channel.write(buffer);
+    }
+
+    @Override
+    int read0(ByteBuffer buffer) throws IOException {
+        return channel.read(buffer);
+    }
+
+    @Override
+    boolean finishConnect() throws IOException {
+        return channel.finishConnect();
+    }
+
+    @Override
+    public boolean isConnected() {
+        return channel.isConnected();
+    }
+
+    @Override
+    public boolean isConnectionPending() {
+        return channel.isConnectionPending();
+    }
+
+    @Override
+    public SocketAddress getLocalAddress() {
+        try {
+            if (localAddress == null && channel.getLocalAddress() != null) {
+                localAddress = AddressUtils.jdkSocketToSocketAddress(channel.getLocalAddress());
+            }
+        } catch (Exception e) {
+        }
+
+        return localAddress;
+    }
+
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioSocketEmitterWorker.java	Mon Jul 08 10:54:22 2019 +0200
@@ -0,0 +1,73 @@
+package com.passus.st.emitter.nio;
+
+import com.passus.net.SocketAddress;
+import com.passus.net.utils.AddressUtils;
+import com.passus.st.emitter.EmitterHandler;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.emitter.SessionMapper.ConnectionParams;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import static com.passus.st.emitter.SessionMapper.ANY_SOCKET;
+
+/**
+ * @author Mirosław Hawrot
+ */
+public class NioSocketEmitterWorker extends NioAbstractEmitterWorker {
+
+    public NioSocketEmitterWorker(int index) throws IOException {
+        super(index);
+    }
+
+    @Override
+    protected void doConnect(SessionInfo sessionInfo, EmitterHandler handler) {
+        try {
+            ConnectionParams connParams = getConnParams(sessionInfo, handler);
+            if (connParams == null) {
+                return;
+            }
+
+            SocketChannel channel = SocketChannel.open();
+            channel.configureBlocking(false);
+
+            SocketAddress bindAddress = connParams.getBindAddress();
+            if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) {
+                channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress));
+            }
+
+            SocketAddress remoteAddress = connParams.getRemoteAddress();
+            if (remoteAddress == null) {
+                remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort());
+            }
+
+            NioSocketChannelContext channelContext = new NioSocketChannelContext(this, channel, remoteAddress, sessionInfo);
+            KeyContext keyContext = new KeyContext(channelContext, handler);
+            SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext);
+            try {
+                handler.channelRegistered(channelContext);
+            } catch (Exception ex) {
+                doCatchException(key, ex);
+            }
+
+            channelContext.selectionKey(key);
+            try {
+                channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress));
+            } catch (Exception ex) {
+                doCatchException(key, ex);
+                return;
+            }
+
+            selector.wakeup();
+        } catch (Exception e) {
+            if (collectMetrics) {
+                metric.errorCaught(e);
+            }
+            logger.error(e.getMessage(), e);
+        }
+
+    }
+
+
+}
--- a/stress-tester/src/main/java/com/passus/st/job/JobExecutor.java	Fri May 31 13:25:02 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/job/JobExecutor.java	Mon Jul 08 10:54:22 2019 +0200
@@ -8,7 +8,6 @@
 import com.passus.st.emitter.nio.NioEmitter;
 import com.passus.st.metric.ScheduledMetricsCollector;
 import com.passus.st.project.Project;
-import com.passus.st.reporter.snmp.SnmpLogger;
 import com.passus.st.source.PcapSessionEventSource;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -16,7 +15,6 @@
 import java.util.*;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public class JobExecutor {
@@ -174,8 +172,6 @@
 
         private ScheduledMetricsCollector metricsCollector;
 
-        private SnmpLogger snmpLogger;
-
         private JobThread(JobContext context) {
             super("Job-" + context.getJobId());
             this.context = context;
@@ -378,11 +374,6 @@
                 }
             }
 
-            if (snmpLogger != null) {
-                snmpLogger.close();
-                LOGGER.debug("SNMP logger stopped.");
-            }
-
             metricsCollector.flush(true);
             metricsCollector.stop();
             changeStatus(JobStatus.STOPPED);
--- a/stress-tester/src/test/java/com/passus/st/client/http/ReporterConfiguratorTest.java	Fri May 31 13:25:02 2019 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/http/ReporterConfiguratorTest.java	Mon Jul 08 10:54:22 2019 +0200
@@ -6,14 +6,16 @@
 import com.passus.config.YamlConfigurationReader;
 import com.passus.config.validation.Errors;
 import com.passus.st.reporter.trx.SocketReporterClient;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
 import static com.passus.st.utils.ConfigurationContextConsts.REPORTER_DEFAULT_DESTINATION;
 import static com.passus.st.utils.ConfigurationContextConsts.REPORTER_DESTINATIONS;
-import java.util.List;
-import static org.testng.AssertJUnit.*;
-import org.testng.annotations.Test;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNotNull;
 
 /**
- *
  * @author mikolaj.podbielski
  */
 public class ReporterConfiguratorTest {
@@ -102,10 +104,10 @@
 
         configure(cfgString, context);
 
-        ReporterRemoteDestination remote = (ReporterRemoteDestination) context.get(REPORTER_DEFAULT_DESTINATION);
+        ReporterRemoteDestination remote = context.get(REPORTER_DEFAULT_DESTINATION);
         checkRemote(remote, "1.1.1.1", 2222, 3);
 
-        List<ReporterDestination> destinations = (List<ReporterDestination>) context.get(REPORTER_DESTINATIONS);
+        List<ReporterDestination> destinations = context.get(REPORTER_DESTINATIONS);
         assertNotNull(destinations);
         assertEquals(3, destinations.size());