Mercurial > stress-tester
changeset 967:57e0d4394116
NioDatagramChannelContext in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java Fri May 31 13:25:02 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java Mon Jul 08 10:54:22 2019 +0200 @@ -2,41 +2,40 @@ import com.passus.data.ByteBuff; import com.passus.net.SocketAddress; -import com.passus.st.emitter.SessionInfo; + import java.io.IOException; /** - * * @author Mirosław Hawrot */ public interface ChannelContext { - public boolean isConnected(); - - public boolean isConnectionPending(); + boolean isConnected(); - public void write(byte[] data, int offset, int length) throws IOException; + boolean isConnectionPending(); - public void write(ByteBuff data) throws IOException; + void write(byte[] data, int offset, int length) throws IOException; - public default void writeAndFlush(byte[] data, int offset, int length) throws IOException { + void write(ByteBuff data) throws IOException; + + default void writeAndFlush(byte[] data, int offset, int length) throws IOException { write(data, offset, length); flush(); } - public default void writeAndFlush(ByteBuff data) throws IOException { + default void writeAndFlush(ByteBuff data) throws IOException { write(data); flush(); } - public void flush() throws IOException; - - public void close() throws IOException; + void flush() throws IOException; - public SocketAddress getLocalAddress(); + void close() throws IOException; - public SocketAddress getRemoteAddress(); + SocketAddress getLocalAddress(); - public SessionInfo getSessionInfo(); + SocketAddress getRemoteAddress(); + + SessionInfo getSessionInfo(); }
--- a/stress-tester/src/main/java/com/passus/st/emitter/EmitterHandler.java Fri May 31 13:25:02 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/EmitterHandler.java Mon Jul 08 10:54:22 2019 +0200 @@ -8,38 +8,30 @@ public interface EmitterHandler { default void channelRegistered(ChannelContext context) throws Exception { - } default void channelUnregistered(ChannelContext context) throws Exception { - } default void channelActive(ChannelContext context) throws Exception { - } default void channelInactive(ChannelContext context) throws Exception { - } default void dataReceived(ChannelContext context, ByteBuff data) throws Exception { } default void dataWriteStart(ChannelContext context) { - } default void dataWritten(ChannelContext context) throws Exception { - } default void sessionInvalidated(SessionInfo session) throws Exception { - } default void errorOccurred(ChannelContext context, Throwable cause) throws Exception { - } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionMapper.java Fri May 31 13:25:02 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionMapper.java Mon Jul 08 10:54:22 2019 +0200 @@ -5,18 +5,17 @@ import com.passus.net.SocketAddress; /** - * * @author Mirosław Hawrot */ public interface SessionMapper { - public static final SocketAddress ANY_SOCKET = new SocketAddress(Ip4Address.EMPTY_ADDRESS, 0); + SocketAddress ANY_SOCKET = new SocketAddress(Ip4Address.EMPTY_ADDRESS, 0); - public boolean invalidate(SessionInfo sessionInfo); + boolean invalidate(SessionInfo sessionInfo); - public ConnectionParams map(SessionInfo sessionInfo); + ConnectionParams map(SessionInfo sessionInfo); - public static class ConnectionParams { + class ConnectionParams { private final SocketAddress remoteAddress;
--- a/stress-tester/src/main/java/com/passus/st/emitter/SocketMatcher.java Fri May 31 13:25:02 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/SocketMatcher.java Mon Jul 08 10:54:22 2019 +0200 @@ -8,6 +8,6 @@ */ interface SocketMatcher { - public boolean match(IpAddress ip, int port); + boolean match(IpAddress ip, int port); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java Mon Jul 08 10:54:22 2019 +0200 @@ -0,0 +1,478 @@ +package com.passus.st.emitter.nio; + +import com.passus.data.ByteBuff; +import com.passus.data.HeapByteBuff; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.emitter.SessionMapper.ConnectionParams; +import com.passus.st.metric.MetricsContainer; + +import java.io.IOException; +import java.net.ConnectException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID; + +public abstract class NioAbstractEmitterWorker extends NioEmitterWorker { + + protected final Selector selector; + + protected final Queue<Task> tasks = new ConcurrentLinkedQueue<>(); + + private volatile boolean working = false; + + public NioAbstractEmitterWorker(int index) throws IOException { + super(index); + selector = Selector.open(); + } + + @Override + public void setWorking(boolean working) { + this.working = working; + if (this.working && !working) { + try { + selector.close(); + } catch (IOException ex) { + logger.warn(ex.getMessage(), ex); + } + } + } + + public boolean isWorking() { + return working; + } + + @Override + public void writeMetrics(MetricsContainer container) { + if (collectMetrics) { + container.update(System.currentTimeMillis(), metric); + metric.reset(); + } + } + + @Override + protected void flush(SelectionKey key) { + tasks.add(new FlushTask(key)); + selector.wakeup(); + } + + @Override + public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException { + tasks.add(new ConnectTask(sessionInfo, handler)); + selector.wakeup(); + } + + protected ConnectionParams getConnParams(SessionInfo sessionInfo, EmitterHandler handler) { + ConnectionParams connParams = sessionMapper.map(sessionInfo); + if (connParams == null) { + if (logger.isDebugEnabled()) { + logger.debug("Unable to map session '{}'.", sessionInfo); + } + + if (collectMetrics) { + synchronized (metric) { + metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID); + } + } + + try { + handler.sessionInvalidated(sessionInfo); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + return null; + } + + if (logger.isDebugEnabled()) { + logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); + } + + return connParams; + } + + protected abstract void doConnect(SessionInfo sessionInfo, EmitterHandler handler); + + protected void doFinishConnect(SelectionKey key) { + KeyContext keyContext = (KeyContext) key.attachment(); + + if (logger.isDebugEnabled()) { + logger.debug("Connecting to '" + keyContext.channelContext.getRemoteAddress() + "'."); + } + + try { + NioChannelContext channelContext = keyContext.channelContext; + long connStart = System.currentTimeMillis(); + boolean timeouted = false; + while (!channelContext.finishConnect()) { + long now = System.currentTimeMillis(); + if (now - connStart < connectionTimeout) { + timeouted = true; + break; + } + } + key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); + + if (timeouted) { + if (logger.isDebugEnabled()) { + logger.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress()); + } + + throw new ConnectException("Connection timed out."); + + } + } catch (Exception e) { + doCatchException(key, e); + key.cancel(); + if (collectMetrics) { + metric.incConnectionsErrors(); + } + + return; + } + + try { + if (collectMetrics) { + metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress()); + metric.addBindSocket(keyContext.channelContext.getLocalAddress()); + } + + if (logger.isDebugEnabled()) { + logger.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress()); + } + + keyContext.handler.channelActive(keyContext.channelContext); + setOpRead(key); + } catch (Exception ex) { + logger.error(ex.getMessage(), ex); + } + } + + protected void doClose(SelectionKey key) { + if (!key.channel().isOpen()) { + selector.wakeup(); + return; + } + + try { + key.channel().close(); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + KeyContext keyContext = (KeyContext) key.attachment(); + if (logger.isDebugEnabled()) { + logger.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'."); + } + + try { + keyContext.handler.channelInactive(keyContext.channelContext); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + key.cancel(); + try { + keyContext.handler.channelUnregistered(keyContext.channelContext); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + if (logger.isDebugEnabled()) { + logger.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'."); + } + + if (collectMetrics) { + metric.incClosedConnections(); + } + + selector.wakeup(); + } + + protected void doCatchException(SelectionKey key, Throwable cause) { + if (logger.isDebugEnabled()) { + logger.debug("Error occurred. " + cause.getMessage(), cause); + } + + if (collectMetrics) { + synchronized (metric) { + metric.errorCaught(cause); + } + } + + KeyContext keyContext = (KeyContext) key.attachment(); + + try { + keyContext.handler.errorOccurred(keyContext.channelContext, cause); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + } + + protected void setOpRead(SelectionKey key) { + if (!key.isValid() || key.isReadable()) { + return; + } + + key.interestOps(key.interestOps() | SelectionKey.OP_READ); + key.selector().wakeup(); + } + + protected void clearOpRead(SelectionKey key) { + if (!key.isValid() || !key.isReadable()) { + return; + } + + key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); + key.selector().wakeup(); + } + + protected void setOpWrite(SelectionKey key) { + if (!key.isValid() || key.isWritable()) { + return; + } + + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + key.selector().wakeup(); + } + + protected void clearOpWrite(SelectionKey key) { + if (!key.isValid() || !key.isWritable()) { + return; + } + + key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); + key.selector().wakeup(); + } + + @Override + protected void requestClose(SelectionKey key) { + tasks.add(new CloseTask(key)); + key.selector().wakeup(); + } + + protected void doWrite(SelectionKey key) { + KeyContext keyContext = (KeyContext) key.attachment(); + if (logger.isDebugEnabled()) { + logger.debug("Writing ({} -> {}).", + keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); + } + + NioChannelContext channelContext = keyContext.channelContext; + keyContext.handler.dataWriteStart(channelContext); + Queue<ByteBuffer> queue = channelContext.dataQueue(); + int written = 0; + try { + ByteBuffer buffer; + while (!queue.isEmpty()) { + buffer = queue.poll(); + while (buffer.hasRemaining()) { + int res = channelContext.write0(buffer); + + if (res == -1) { + doClose(key); + return; + } + + if (collectMetrics) { + metric.updateSentBytes(res); + } + + written += res; + } + } + } catch (Exception e) { + doCatchException(key, e); + doClose(key); + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Written {}B ({} -> {})", written, + keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); + } + + //TODO Operacje na handlerach powinny przechodzic przez Executor + try { + keyContext.handler.dataWritten(keyContext.channelContext); + logger.debug("Write handled."); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + setOpRead(key); + clearOpWrite(key); + } + + protected void doRead(SelectionKey key) { + KeyContext keyContext = (KeyContext) key.attachment(); + + if (logger.isDebugEnabled()) { + logger.debug("Reading ({} -> {})", + keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); + } + + ByteBuffer buffer = keyContext.buffer; + + buffer.clear(); + + NioChannelContext channelContext = keyContext.channelContext; + ByteBuff buff = new HeapByteBuff(); + int totalReaded = 0; + int readed; + try { + while ((readed = channelContext.read0(buffer)) > 0) { + buffer.flip(); + buff.append(buffer.array(), buffer.position(), buffer.limit()); + buffer.clear(); + totalReaded += readed; + + if (collectMetrics) { + metric.updateReceivedBytes(readed); + } + } + } catch (IOException e) { + doCatchException(key, e); + doClose(key); + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Readed {}B ({} -> {})", totalReaded, + keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); + } + + if (totalReaded > 0) { + try { + keyContext.handler.dataReceived(keyContext.channelContext, buff); + logger.debug("Read handled."); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + } + + if (readed == -1) { + doClose(key); + return; + } + + keyContext.buffer.flip(); + } + + @Override + public void run() { + int selected = 0; + working = true; + while (working) { + if (!tasks.isEmpty()) { + Task task; + while ((task = tasks.poll()) != null) { + if (task.code == Task.CLOSE) { + doClose(((CloseTask) task).key); + } else if (task.code == Task.CONNECT) { + ConnectTask taskConn = (ConnectTask) task; + doConnect(taskConn.sessionInfo, taskConn.handler); + } else if (task.code == Task.FLUSH) { + FlushTask flushTask = (FlushTask) task; + setOpWrite(flushTask.key); + } + } + } + + try { + selected = selector.select(selectTimeout); + } catch (IOException ex) { + logger.warn(ex.getMessage(), ex); + } + + if (selected > 0) { + Iterator<SelectionKey> it = selector.selectedKeys().iterator(); + while (it.hasNext()) { + SelectionKey key = it.next(); + it.remove(); + + if (!key.isValid()) { + continue; + } + + if (key.isConnectable()) { + doFinishConnect(key); + } else if (key.isWritable()) { + doWrite(key); + } else if (key.isReadable()) { + doRead(key); + } + } + } + } + } + + protected static class KeyContext { + + final EmitterHandler handler; + + final NioChannelContext channelContext; + + ByteBuffer buffer = ByteBuffer.allocate(1024); + + public KeyContext(NioChannelContext channelContext, EmitterHandler handler) { + this.channelContext = channelContext; + this.handler = handler; + } + + } + + protected static abstract class Task { + + public static final int CLOSE = 1; + public static final int CONNECT = 2; + public static final int FLUSH = 3; + + final int code; + + public Task(int code) { + this.code = code; + } + + } + + protected final static class ConnectTask extends Task { + + final SessionInfo sessionInfo; + final EmitterHandler handler; + + public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) { + super(CONNECT); + this.sessionInfo = sessionInfo; + this.handler = handler; + } + + } + + protected final static class CloseTask extends Task { + + final SelectionKey key; + + public CloseTask(SelectionKey key) { + super(CLOSE); + this.key = key; + } + + } + + protected final static class FlushTask extends Task { + + final SelectionKey key; + + public FlushTask(SelectionKey key) { + super(FLUSH); + this.key = key; + } + + } +}
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java Fri May 31 13:25:02 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,108 +0,0 @@ -package com.passus.st.emitter.nio; - -import com.passus.data.ByteBuff; -import com.passus.net.SocketAddress; -import com.passus.net.utils.AddressUtils; -import com.passus.st.emitter.ChannelContext; -import com.passus.st.emitter.SessionInfo; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.LinkedList; -import java.util.Queue; - -/** - * - * @author Mirosław Hawrot - */ -public class NioChannelContext implements ChannelContext { - - private final NioEmitterWorker worker; - - private final SessionInfo sessionInfo; - - private final SocketChannel channel; - - private final Queue<ByteBuffer> dataQueue; - - private SocketAddress localAddress; - - private SocketAddress remoteAddress; - - private SelectionKey key; - - public NioChannelContext(NioEmitterWorker worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) { - this.worker = worker; - this.channel = channel; - this.remoteAddress = remoteAddress; - this.sessionInfo = sessionInfo; - this.dataQueue = new LinkedList<>(); - - } - - Queue<ByteBuffer> dataQueue() { - return dataQueue; - } - - public void selectionKey(SelectionKey key) { - this.key = key; - } - - private void addToQeueu(ByteBuffer buffer) throws IOException { - dataQueue.add(buffer); - } - - @Override - public boolean isConnected() { - return channel.isConnected(); - } - - @Override - public boolean isConnectionPending() { - return channel.isConnectionPending(); - } - - @Override - public void write(byte[] data, int offset, int length) throws IOException { - addToQeueu(ByteBuffer.wrap(data, offset, length)); - } - - @Override - public void write(ByteBuff data) throws IOException { - addToQeueu(data.toNioByteBuffer()); - } - - @Override - public void flush() { - worker.flush(key); - } - - @Override - public void close() throws IOException { - worker.requestClose(key); - } - - @Override - public SocketAddress getLocalAddress() { - try { - if (localAddress == null && channel.getLocalAddress() != null) { - localAddress = AddressUtils.jdkSocketToSocketAddress(channel.getLocalAddress()); - } - } catch (Exception e) { - } - - return localAddress; - } - - @Override - public SocketAddress getRemoteAddress() { - return remoteAddress; - } - - @Override - public SessionInfo getSessionInfo() { - return sessionInfo; - } - -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramChannelContext.java Mon Jul 08 10:54:22 2019 +0200 @@ -0,0 +1,54 @@ +package com.passus.st.emitter.nio; + +import com.passus.net.SocketAddress; +import com.passus.net.utils.AddressUtils; +import com.passus.st.emitter.SessionInfo; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; + +public class NioDatagramChannelContext extends NioChannelContext<DatagramChannel> { + + public NioDatagramChannelContext(NioEmitterWorker worker, DatagramChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) { + super(worker, channel, remoteAddress, sessionInfo); + } + + @Override + boolean finishConnect() throws IOException { + return true; + } + + @Override + int write0(ByteBuffer buffer) throws IOException { + return channel.write(buffer); + } + + @Override + int read0(ByteBuffer buffer) throws IOException { + return channel.read(buffer); + } + + @Override + public boolean isConnected() { + return channel.isConnected(); + } + + @Override + public boolean isConnectionPending() { + return false; + } + + @Override + public SocketAddress getLocalAddress() { + try { + if (localAddress == null && channel.getLocalAddress() != null) { + localAddress = AddressUtils.jdkSocketToSocketAddress(channel.getLocalAddress()); + } + } catch (Exception e) { + } + + return localAddress; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramEmitterWorker.java Mon Jul 08 10:54:22 2019 +0200 @@ -0,0 +1,73 @@ +package com.passus.st.emitter.nio; + +import com.passus.net.SocketAddress; +import com.passus.net.utils.AddressUtils; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.emitter.SessionMapper; + +import java.io.IOException; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; + +import static com.passus.st.emitter.SessionMapper.ANY_SOCKET; +import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID; + +public class NioDatagramEmitterWorker extends NioAbstractEmitterWorker { + + public NioDatagramEmitterWorker(int index) throws IOException { + super(index); + } + + @Override + protected void doConnect(SessionInfo sessionInfo, EmitterHandler handler) { + try { + SessionMapper.ConnectionParams connParams = getConnParams(sessionInfo, handler); + if (connParams == null) { + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); + } + + DatagramChannel channel = DatagramChannel.open(); + channel.configureBlocking(false); + + SocketAddress bindAddress = connParams.getBindAddress(); + if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) { + channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress)); + } + + SocketAddress remoteAddress = connParams.getRemoteAddress(); + if (remoteAddress == null) { + remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); + } + + NioDatagramChannelContext channelContext = new NioDatagramChannelContext(this, channel, remoteAddress, sessionInfo); + KeyContext keyContext = new KeyContext(channelContext, handler); + SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext); + try { + handler.channelRegistered(channelContext); + } catch (Exception ex) { + doCatchException(key, ex); + } + + channelContext.selectionKey(key); + try { + channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress)); + } catch (Exception ex) { + doCatchException(key, ex); + return; + } + + selector.wakeup(); + } catch (Exception e) { + if (collectMetrics) { + metric.errorCaught(e); + } + logger.error(e.getMessage(), e); + } + + } +}
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDefaultEmitterWorker.java Fri May 31 13:25:02 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,520 +0,0 @@ -package com.passus.st.emitter.nio; - -import com.passus.data.ByteBuff; -import com.passus.data.HeapByteBuff; -import com.passus.net.SocketAddress; -import com.passus.net.utils.AddressUtils; -import com.passus.st.emitter.EmitterHandler; -import static com.passus.st.emitter.SessionMapper.ANY_SOCKET; -import com.passus.st.emitter.SessionMapper.ConnectionParams; -import com.passus.st.emitter.SessionInfo; -import com.passus.st.metric.MetricsContainer; -import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID; -import java.io.IOException; -import java.net.ConnectException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - -/** - * - * @author Mirosław Hawrot - */ -public class NioDefaultEmitterWorker extends NioEmitterWorker { - - private final Selector selector; - - private volatile boolean working = false; - - private final Queue<Task> tasks = new ConcurrentLinkedQueue<>(); - - public NioDefaultEmitterWorker(int index) throws IOException { - super(index); - selector = Selector.open(); - } - - @Override - public void setWorking(boolean working) { - this.working = working; - if (this.working && !working) { - try { - selector.close(); - } catch (IOException ex) { - logger.warn(ex.getMessage(), ex); - } - } - } - - public boolean isWorking() { - return working; - } - - @Override - public void writeMetrics(MetricsContainer container) { - if (collectMetrics) { - container.update(System.currentTimeMillis(), metric); - metric.reset(); - } - } - - @Override - public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException { - tasks.add(new ConnectTask(sessionInfo, handler)); - selector.wakeup(); - } - - @Override - protected void flush(SelectionKey key) { - tasks.add(new FlushTask(key)); - selector.wakeup(); - } - - private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) { - try { - ConnectionParams connParams = sessionMapper.map(sessionInfo); - if (connParams == null) { - if (logger.isDebugEnabled()) { - logger.debug("Unable to map session '{}'.", sessionInfo); - } - - if (collectMetrics) { - synchronized (metric) { - metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID); - } - } - - try { - handler.sessionInvalidated(sessionInfo); - } catch (Exception e) { - logger.debug(e.getMessage(), e); - } - - return; - } - - if (logger.isDebugEnabled()) { - logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); - } - - SocketChannel channel = SocketChannel.open(); - channel.configureBlocking(false); - - SocketAddress bindAddress = connParams.getBindAddress(); - if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) { - channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress)); - } - - SocketAddress remoteAddress = connParams.getRemoteAddress(); - if (remoteAddress == null) { - remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); - } - - NioChannelContext channelContext = new NioChannelContext(this, channel, remoteAddress, sessionInfo); - KeyContext keyContext = new KeyContext(channelContext, handler); - SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext); - try { - handler.channelRegistered(channelContext); - } catch (Exception ex) { - doCatchException(key, ex); - } - - channelContext.selectionKey(key); - try { - channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress)); - } catch (Exception ex) { - doCatchException(key, ex); - return; - } - - selector.wakeup(); - } catch (Exception e) { - if (collectMetrics) { - metric.errorCaught(e); - } - logger.error(e.getMessage(), e); - } - - } - - private void doFinishConnect(SelectionKey key) { - SocketChannel channel = (SocketChannel) key.channel(); - KeyContext keyContext = (KeyContext) key.attachment(); - - if (logger.isDebugEnabled()) { - logger.debug("Connecting to '" + keyContext.channelContext.getRemoteAddress() + "'."); - } - - try { - long connStart = System.currentTimeMillis(); - boolean timeouted = false; - while (!channel.finishConnect()) { - long now = System.currentTimeMillis(); - if (now - connStart < connectionTimeout) { - timeouted = true; - break; - } - } - key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); - - if (timeouted) { - if (logger.isDebugEnabled()) { - logger.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress()); - } - - throw new ConnectException("Connection timed out."); - - } - } catch (Exception e) { - doCatchException(key, e); - key.cancel(); - if (collectMetrics) { - metric.incConnectionsErrors(); - } - - return; - } - - try { - if (collectMetrics) { - metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress()); - metric.addBindSocket(keyContext.channelContext.getLocalAddress()); - } - - if (logger.isDebugEnabled()) { - logger.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress()); - } - - keyContext.handler.channelActive(keyContext.channelContext); - setOpRead(key); - } catch (Exception ex) { - logger.error(ex.getMessage(), ex); - } - } - - private void doWrite(SelectionKey key) { - SocketChannel socketChannel = (SocketChannel) key.channel(); - KeyContext keyContext = (KeyContext) key.attachment(); - if (logger.isDebugEnabled()) { - logger.debug("Writing ({} -> {}).", - keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); - } - - keyContext.handler.dataWriteStart(keyContext.channelContext); - Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue(); - int written = 0; - try { - ByteBuffer buffer; - while (!queue.isEmpty()) { - buffer = queue.poll(); - while (buffer.hasRemaining()) { - int res = socketChannel.write(buffer); - - if (res == -1) { - doClose(key); - return; - } - - if (collectMetrics) { - metric.updateSentBytes(res); - } - - written += res; - } - } - } catch (Exception e) { - doCatchException(key, e); - doClose(key); - return; - } - - if (logger.isDebugEnabled()) { - logger.debug("Written {}B ({} -> {})", written, - keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); - } - - //TODO Operacje na handlerach powinny przechodzic przez Executor - try { - keyContext.handler.dataWritten(keyContext.channelContext); - logger.debug("Write handled."); - } catch (Exception e) { - logger.debug(e.getMessage(), e); - } - - setOpRead(key); - clearOpWrite(key); - } - - private void doRead(SelectionKey key) { - SocketChannel channel = (SocketChannel) key.channel(); - KeyContext keyContext = (KeyContext) key.attachment(); - - if (logger.isDebugEnabled()) { - logger.debug("Reading ({} -> {})", - keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); - } - - ByteBuffer buffer = keyContext.buffer; - - buffer.clear(); - - ByteBuff buff = new HeapByteBuff(); - int totalReaded = 0; - int readed; - try { - while ((readed = channel.read(buffer)) > 0) { - buffer.flip(); - buff.append(buffer.array(), buffer.position(), buffer.limit()); - buffer.clear(); - totalReaded += readed; - - if (collectMetrics) { - metric.updateReceivedBytes(readed); - } - } - } catch (IOException e) { - doCatchException(key, e); - doClose(key); - return; - } - - if (logger.isDebugEnabled()) { - logger.debug("Readed {}B ({} -> {})", totalReaded, - keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); - } - - if (totalReaded > 0) { - try { - keyContext.handler.dataReceived(keyContext.channelContext, buff); - logger.debug("Read handled."); - } catch (Exception e) { - logger.debug(e.getMessage(), e); - } - } - - if (readed == -1) { - doClose(key); - return; - } - - keyContext.buffer.flip(); - } - - private void doCatchException(SelectionKey key, Throwable cause) { - if (logger.isDebugEnabled()) { - logger.debug("Error occured. " + cause.getMessage(), cause); - } - - if (collectMetrics) { - synchronized (metric) { - metric.errorCaught(cause); - } - } - - KeyContext keyContext = (KeyContext) key.attachment(); - - try { - keyContext.handler.errorOccurred(keyContext.channelContext, cause); - } catch (Exception e) { - logger.debug(e.getMessage(), e); - } - } - - private void doClose(SelectionKey key) { - if (!key.channel().isOpen()) { - selector.wakeup(); - return; - } - - try { - key.channel().close(); - } catch (Exception e) { - logger.debug(e.getMessage(), e); - } - - KeyContext keyContext = (KeyContext) key.attachment(); - if (logger.isDebugEnabled()) { - logger.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'."); - } - - try { - keyContext.handler.channelInactive(keyContext.channelContext); - } catch (Exception e) { - logger.debug(e.getMessage(), e); - } - - key.cancel(); - try { - keyContext.handler.channelUnregistered(keyContext.channelContext); - } catch (Exception e) { - logger.debug(e.getMessage(), e); - } - - if (logger.isDebugEnabled()) { - logger.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'."); - } - - if (collectMetrics) { - metric.incClosedConnections(); - } - - selector.wakeup(); - } - - static void setOpRead(SelectionKey key) { - if (!key.isValid() || key.isReadable()) { - return; - } - - key.interestOps(key.interestOps() | SelectionKey.OP_READ); - key.selector().wakeup(); - } - - static void clearOpRead(SelectionKey key) { - if (!key.isValid() || !key.isReadable()) { - return; - } - - key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); - key.selector().wakeup(); - } - - static void setOpWrite(SelectionKey key) { - if (!key.isValid() || key.isWritable()) { - return; - } - - key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); - key.selector().wakeup(); - } - - static void clearOpWrite(SelectionKey key) { - if (!key.isValid() || !key.isWritable()) { - return; - } - - key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); - key.selector().wakeup(); - } - - @Override - protected void requestClose(SelectionKey key) { - tasks.add(new CloseTask(key)); - key.selector().wakeup(); - } - - @Override - public void run() { - int selected = 0; - working = true; - while (working) { - if (!tasks.isEmpty()) { - Task task; - while ((task = tasks.poll()) != null) { - if (task.code == Task.CLOSE) { - doClose(((CloseTask) task).key); - } else if (task.code == Task.CONNECT) { - ConnectTask taskConn = (ConnectTask) task; - doConnect(taskConn.sessionInfo, taskConn.handler); - } else if (task.code == Task.FLUSH) { - FlushTask flushTask = (FlushTask) task; - setOpWrite(flushTask.key); - } - } - } - - try { - selected = selector.select(selectTimeout); - } catch (IOException ex) { - logger.warn(ex.getMessage(), ex); - } - - if (selected > 0) { - Iterator<SelectionKey> it = selector.selectedKeys().iterator(); - while (it.hasNext()) { - SelectionKey key = it.next(); - it.remove(); - - if (!key.isValid()) { - continue; - } - - if (key.isConnectable()) { - doFinishConnect(key); - } else if (key.isWritable()) { - doWrite(key); - } else if (key.isReadable()) { - doRead(key); - } - } - } - } - } - - private static class KeyContext { - - private final EmitterHandler handler; - - private final NioChannelContext channelContext; - - private ByteBuffer buffer = ByteBuffer.allocate(1024); - - public KeyContext(NioChannelContext channelContext, EmitterHandler handler) { - this.channelContext = channelContext; - this.handler = handler; - } - - } - - private static abstract class Task { - - public static final int CLOSE = 1; - public static final int CONNECT = 2; - public static final int FLUSH = 3; - - private final int code; - - public Task(int code) { - this.code = code; - } - - } - - private final static class ConnectTask extends Task { - - private final SessionInfo sessionInfo; - private final EmitterHandler handler; - - public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) { - super(CONNECT); - this.sessionInfo = sessionInfo; - this.handler = handler; - } - - } - - private final static class CloseTask extends Task { - - private final SelectionKey key; - - public CloseTask(SelectionKey key) { - super(CLOSE); - this.key = key; - } - - } - - private final static class FlushTask extends Task { - - private final SelectionKey key; - - public FlushTask(SelectionKey key) { - super(FLUSH); - this.key = key; - } - - } -}
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java Fri May 31 13:25:02 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java Mon Jul 08 10:54:22 2019 +0200 @@ -58,7 +58,7 @@ private final Class<? extends NioEmitterWorker> workerClass; public NioEmitter() { - this(NioDefaultEmitterWorker.class); + this(NioSocketEmitterWorker.class); } public NioEmitter(Class<? extends NioEmitterWorker> workerClass) {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioSocketChannelContext.java Mon Jul 08 10:54:22 2019 +0200 @@ -0,0 +1,58 @@ +package com.passus.st.emitter.nio; + +import com.passus.net.SocketAddress; +import com.passus.net.utils.AddressUtils; +import com.passus.st.emitter.SessionInfo; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +/** + * @author Mirosław Hawrot + */ +public class NioSocketChannelContext extends NioChannelContext<SocketChannel> { + + public NioSocketChannelContext(NioEmitterWorker worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) { + super(worker, channel, remoteAddress, sessionInfo); + } + + @Override + int write0(ByteBuffer buffer) throws IOException { + return channel.write(buffer); + } + + @Override + int read0(ByteBuffer buffer) throws IOException { + return channel.read(buffer); + } + + @Override + boolean finishConnect() throws IOException { + return channel.finishConnect(); + } + + @Override + public boolean isConnected() { + return channel.isConnected(); + } + + @Override + public boolean isConnectionPending() { + return channel.isConnectionPending(); + } + + @Override + public SocketAddress getLocalAddress() { + try { + if (localAddress == null && channel.getLocalAddress() != null) { + localAddress = AddressUtils.jdkSocketToSocketAddress(channel.getLocalAddress()); + } + } catch (Exception e) { + } + + return localAddress; + } + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioSocketEmitterWorker.java Mon Jul 08 10:54:22 2019 +0200 @@ -0,0 +1,73 @@ +package com.passus.st.emitter.nio; + +import com.passus.net.SocketAddress; +import com.passus.net.utils.AddressUtils; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.emitter.SessionMapper.ConnectionParams; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +import static com.passus.st.emitter.SessionMapper.ANY_SOCKET; + +/** + * @author Mirosław Hawrot + */ +public class NioSocketEmitterWorker extends NioAbstractEmitterWorker { + + public NioSocketEmitterWorker(int index) throws IOException { + super(index); + } + + @Override + protected void doConnect(SessionInfo sessionInfo, EmitterHandler handler) { + try { + ConnectionParams connParams = getConnParams(sessionInfo, handler); + if (connParams == null) { + return; + } + + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); + + SocketAddress bindAddress = connParams.getBindAddress(); + if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) { + channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress)); + } + + SocketAddress remoteAddress = connParams.getRemoteAddress(); + if (remoteAddress == null) { + remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); + } + + NioSocketChannelContext channelContext = new NioSocketChannelContext(this, channel, remoteAddress, sessionInfo); + KeyContext keyContext = new KeyContext(channelContext, handler); + SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext); + try { + handler.channelRegistered(channelContext); + } catch (Exception ex) { + doCatchException(key, ex); + } + + channelContext.selectionKey(key); + try { + channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress)); + } catch (Exception ex) { + doCatchException(key, ex); + return; + } + + selector.wakeup(); + } catch (Exception e) { + if (collectMetrics) { + metric.errorCaught(e); + } + logger.error(e.getMessage(), e); + } + + } + + +}
--- a/stress-tester/src/main/java/com/passus/st/job/JobExecutor.java Fri May 31 13:25:02 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/job/JobExecutor.java Mon Jul 08 10:54:22 2019 +0200 @@ -8,7 +8,6 @@ import com.passus.st.emitter.nio.NioEmitter; import com.passus.st.metric.ScheduledMetricsCollector; import com.passus.st.project.Project; -import com.passus.st.reporter.snmp.SnmpLogger; import com.passus.st.source.PcapSessionEventSource; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -16,7 +15,6 @@ import java.util.*; /** - * * @author Mirosław Hawrot */ public class JobExecutor { @@ -174,8 +172,6 @@ private ScheduledMetricsCollector metricsCollector; - private SnmpLogger snmpLogger; - private JobThread(JobContext context) { super("Job-" + context.getJobId()); this.context = context; @@ -378,11 +374,6 @@ } } - if (snmpLogger != null) { - snmpLogger.close(); - LOGGER.debug("SNMP logger stopped."); - } - metricsCollector.flush(true); metricsCollector.stop(); changeStatus(JobStatus.STOPPED);
--- a/stress-tester/src/test/java/com/passus/st/client/http/ReporterConfiguratorTest.java Fri May 31 13:25:02 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/http/ReporterConfiguratorTest.java Mon Jul 08 10:54:22 2019 +0200 @@ -6,14 +6,16 @@ import com.passus.config.YamlConfigurationReader; import com.passus.config.validation.Errors; import com.passus.st.reporter.trx.SocketReporterClient; +import org.testng.annotations.Test; + +import java.util.List; + import static com.passus.st.utils.ConfigurationContextConsts.REPORTER_DEFAULT_DESTINATION; import static com.passus.st.utils.ConfigurationContextConsts.REPORTER_DESTINATIONS; -import java.util.List; -import static org.testng.AssertJUnit.*; -import org.testng.annotations.Test; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNotNull; /** - * * @author mikolaj.podbielski */ public class ReporterConfiguratorTest { @@ -102,10 +104,10 @@ configure(cfgString, context); - ReporterRemoteDestination remote = (ReporterRemoteDestination) context.get(REPORTER_DEFAULT_DESTINATION); + ReporterRemoteDestination remote = context.get(REPORTER_DEFAULT_DESTINATION); checkRemote(remote, "1.1.1.1", 2222, 3); - List<ReporterDestination> destinations = (List<ReporterDestination>) context.get(REPORTER_DESTINATIONS); + List<ReporterDestination> destinations = context.get(REPORTER_DESTINATIONS); assertNotNull(destinations); assertEquals(3, destinations.size());