Mercurial > stress-tester
changeset 978:d88ea87ac0a7
SocketEmitter in progress, NioEmitter improvements
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Thu Jul 25 13:34:42 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Mon Jul 29 12:54:31 2019 +0200 @@ -354,6 +354,7 @@ context.setBidirectional(flowContext.isBidirectional()); flowContext.channelContext(context); + context.setAttachment(flowContext); changeFlowState(flowContext, STATE_CONNECTED); } @@ -364,14 +365,12 @@ @Override public void channelInactive(ChannelContext context) throws Exception { synchronized (lock) { - FlowContext flowContext = flowContext(context); - if (flowContext != null) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Channel inactive."); - } + FlowContext flowContext = (FlowContext) context.getAttachment(); + if (logger.isDebugEnabled()) { + debug(flowContext, "Channel inactive."); + } - changeFlowState(flowContext, STATE_DISCONNECTED); - } + changeFlowState(flowContext, STATE_DISCONNECTED); lock.notifyAll(); } } @@ -379,42 +378,40 @@ @Override public void dataReceived(ChannelContext context, ByteBuff data) throws Exception { synchronized (lock) { - FlowContext flowContext = flowContext(context); + FlowContext flowContext = (FlowContext) context.getAttachment(); try { - if (flowContext != null) { - FlowHandler client = flowContext.client(); - FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext); - decoder.decode(data, flowContext); - long now = timeGenerator.currentTimeMillis(); - if (flowContext.receivedStartTimestamp() == -1) { - flowContext.receivedStartTimestamp(now); + FlowHandler client = flowContext.client(); + FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext); + decoder.decode(data, flowContext); + long now = timeGenerator.currentTimeMillis(); + if (flowContext.receivedStartTimestamp() == -1) { + flowContext.receivedStartTimestamp(now); + } + + if (decoder.state() == DataDecoder.STATE_ERROR) { + if (logger.isDebugEnabled()) { + debug(flowContext, "Decoder error. " + decoder.getLastError()); } - if (decoder.state() == DataDecoder.STATE_ERROR) { - if (logger.isDebugEnabled()) { - debug(flowContext, "Decoder error. " + decoder.getLastError()); - } + decoder.clear(flowContext); + changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED); + } else if (decoder.state() == DataDecoder.STATE_FINISHED) { + Object resp = decoder.getResult(); + Object req = null; + if (flowContext.sentEvent() != null) { + req = flowContext.sentEvent().getRequest(); + } - decoder.clear(flowContext); - changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED); - } else if (decoder.state() == DataDecoder.STATE_FINISHED) { - Object resp = decoder.getResult(); - Object req = null; - if (flowContext.sentEvent() != null) { - req = flowContext.sentEvent().getRequest(); + if (filterChain.filterInbound(req, resp, flowContext) != Filter.DENY) { + try { + fireResponseReceived(req, resp, flowContext); + } catch (Exception e) { + error(flowContext, e.getMessage(), e); } + } - if (filterChain.filterInbound(req, resp, flowContext) != Filter.DENY) { - try { - fireResponseReceived(req, resp, flowContext); - } catch (Exception e) { - error(flowContext, e.getMessage(), e); - } - } - - decoder.clear(flowContext); - changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED); - } + decoder.clear(flowContext); + changeFlowState(flowContext, FlowContext.STATE_RESP_RECEIVED); } } catch (Exception e) { if (logger.isDebugEnabled()) { @@ -429,8 +426,8 @@ @Override public void dataWriteStart(ChannelContext context) { synchronized (lock) { - FlowContext flowContext = flowContext(context); - if (flowContext != null && flowContext.sentEvent() != null) { + FlowContext flowContext = (FlowContext) context.getAttachment(); + if (flowContext.sentEvent() != null) { long now = timeGenerator.currentTimeMillis(); flowContext.sendStartTimestamp(now); flowContext.client().onDataWriteStart(flowContext); @@ -441,8 +438,8 @@ @Override public void dataWritten(ChannelContext context) throws Exception { synchronized (lock) { - FlowContext flowContext = flowContext(context); - if (flowContext != null && flowContext.sentEvent() != null) { + FlowContext flowContext = (FlowContext) context.getAttachment(); + if (flowContext.sentEvent() != null) { long now = timeGenerator.currentTimeMillis(); if (collectMetric) { synchronized (metric) { @@ -467,11 +464,8 @@ } synchronized (lock) { - FlowContext flowContext = flowContext(context); - if (flowContext != null) { - changeFlowState(flowContext, FlowContext.STATE_ERROR); - } - + FlowContext flowContext = (FlowContext) context.getAttachment(); + changeFlowState(flowContext, FlowContext.STATE_ERROR); lock.notifyAll(); } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java Thu Jul 25 13:34:42 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/ChannelContext.java Mon Jul 29 12:54:31 2019 +0200 @@ -8,7 +8,7 @@ /** * @author Mirosław Hawrot */ -public interface ChannelContext { +public interface ChannelContext<T> { boolean isBidirectional(); @@ -42,4 +42,8 @@ SessionInfo getSessionInfo(); + void setAttachment(T attachment); + + T getAttachment(); + }
--- a/stress-tester/src/main/java/com/passus/st/emitter/Emitter.java Thu Jul 25 13:34:42 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/Emitter.java Mon Jul 29 12:54:31 2019 +0200 @@ -20,6 +20,8 @@ public static final SessionMapper DEFAULT_SESSION_MAPPER = new PassThroughSessionMapper(); + public static final boolean DEFAULT_COLLECT_METRICS = false; + public void setSessionMapper(SessionMapper mapper); public SessionMapper getSessionMapper();
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Thu Jul 25 13:34:42 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java Mon Jul 29 12:54:31 2019 +0200 @@ -14,7 +14,7 @@ /** * @author Mirosław Hawrot */ -public class SessionInfo { +public final class SessionInfo { private static final int DEFAULT_TRANSPORT = Session.PROTOCOL_TCP;
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java Thu Jul 25 13:34:42 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java Mon Jul 29 12:54:31 2019 +0200 @@ -258,14 +258,19 @@ protected void doWrite(SelectionKey key) { KeyContext keyContext = (KeyContext) key.attachment(); + + NioChannelContext channelContext = keyContext.channelContext; + Queue<ByteBuffer> queue = channelContext.dataQueue(); + if(queue.isEmpty()) { + return; + } + + keyContext.handler.dataWriteStart(channelContext); if (logger.isDebugEnabled()) { logger.debug("Writing ({} -> {}).", keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); } - NioChannelContext channelContext = keyContext.channelContext; - keyContext.handler.dataWriteStart(channelContext); - Queue<ByteBuffer> queue = channelContext.dataQueue(); int written = 0; try { ByteBuffer buffer; @@ -320,7 +325,6 @@ } ByteBuffer buffer = keyContext.buffer; - buffer.clear(); NioChannelContext channelContext = keyContext.channelContext; @@ -371,18 +375,16 @@ int selected = 0; working = true; while (working) { - if (!tasks.isEmpty()) { - Task task; - while ((task = tasks.poll()) != null) { - if (task.code == Task.CLOSE) { - doClose(((CloseTask) task).key); - } else if (task.code == Task.CONNECT) { - ConnectTask taskConn = (ConnectTask) task; - doConnect(taskConn.sessionInfo, taskConn.handler); - } else if (task.code == Task.FLUSH) { - FlushTask flushTask = (FlushTask) task; - setOpWrite(flushTask.key); - } + Task task; + while ((task = tasks.poll()) != null) { + if (task.code == Task.CLOSE) { + doClose(((CloseTask) task).key); + } else if (task.code == Task.CONNECT) { + ConnectTask taskConn = (ConnectTask) task; + doConnect(taskConn.sessionInfo, taskConn.handler); + } else if (task.code == Task.FLUSH) { + FlushTask flushTask = (FlushTask) task; + setOpWrite(flushTask.key); } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java Thu Jul 25 13:34:42 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext.java Mon Jul 29 12:54:31 2019 +0200 @@ -11,7 +11,7 @@ import java.util.LinkedList; import java.util.Queue; -public abstract class NioChannelContext<T> implements ChannelContext { +public abstract class NioChannelContext<T, K> implements ChannelContext<K> { protected final NioEmitterWorker worker; @@ -29,6 +29,8 @@ protected SelectionKey key; + private K attachment; + public NioChannelContext(NioEmitterWorker worker, T channel, SocketAddress remoteAddress, SessionInfo sessionInfo) { this.worker = worker; this.channel = channel; @@ -91,5 +93,12 @@ return sessionInfo; } + @Override + public K getAttachment() { + return attachment; + } + public void setAttachment(K attachment) { + this.attachment = attachment; + } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java Thu Jul 25 13:34:42 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioChannelContext2.java Mon Jul 29 12:54:31 2019 +0200 @@ -16,7 +16,7 @@ /** * @author Mirosław Hawrot */ -public class NioChannelContext2 implements ChannelContext { +public class NioChannelContext2<K> implements ChannelContext<K> { private final NioEmitterWorker2 worker; @@ -34,6 +34,8 @@ private SelectionKey key; + private K attachment; + /** * Usunac */ @@ -122,4 +124,13 @@ return sessionInfo; } + @Override + public K getAttachment() { + return attachment; + } + + @Override + public void setAttachment(K attachment) { + this.attachment = attachment; + } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramChannelContext.java Thu Jul 25 13:34:42 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioDatagramChannelContext.java Mon Jul 29 12:54:31 2019 +0200 @@ -8,7 +8,7 @@ import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; -public class NioDatagramChannelContext extends NioChannelContext<DatagramChannel> { +public class NioDatagramChannelContext<T> extends NioChannelContext<DatagramChannel, T> { public NioDatagramChannelContext(NioEmitterWorker worker, DatagramChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) { super(worker, channel, remoteAddress, sessionInfo);
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java Thu Jul 25 13:34:42 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitter.java Mon Jul 29 12:54:31 2019 +0200 @@ -39,8 +39,6 @@ private static final long DEFAULT_CONNECTION_TIMEOUT = 5_000; - private static final boolean DEFAULT_COLLECT_METRICS = false; - private NioEmitterWorker[] workers; private int maxThreads = DEFAULT_NUM_THREADS;
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java Thu Jul 25 13:34:42 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java Mon Jul 29 12:54:31 2019 +0200 @@ -21,7 +21,7 @@ private final int index; - protected long selectTimeout = 100; + protected long selectTimeout = 1; protected SessionMapper sessionMapper;
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java Thu Jul 25 13:34:42 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorkerImpl.java Mon Jul 29 12:54:31 2019 +0200 @@ -36,6 +36,10 @@ return; } + if (logger.isDebugEnabled()) { + logger.debug("Registering TCP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); + } + SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); @@ -84,7 +88,7 @@ } if (logger.isDebugEnabled()) { - logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); + logger.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); } DatagramChannel channel = DatagramChannel.open();
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioSocketChannelContext.java Thu Jul 25 13:34:42 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioSocketChannelContext.java Mon Jul 29 12:54:31 2019 +0200 @@ -11,7 +11,7 @@ /** * @author Mirosław Hawrot */ -public class NioSocketChannelContext extends NioChannelContext<SocketChannel> { +public class NioSocketChannelContext<T> extends NioChannelContext<SocketChannel, T> { public NioSocketChannelContext(NioEmitterWorker worker, SocketChannel channel, SocketAddress remoteAddress, SessionInfo sessionInfo) { super(worker, channel, remoteAddress, sessionInfo);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/AbstractChannelContext.java Mon Jul 29 12:54:31 2019 +0200 @@ -0,0 +1,90 @@ +package com.passus.st.emitter.socket; + +import com.passus.data.ByteBuff; +import com.passus.net.SocketAddress; +import com.passus.st.emitter.ChannelContext; +import com.passus.st.emitter.SessionInfo; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; + +public abstract class AbstractChannelContext<K> implements ChannelContext<K> { + + private final Connection connection; + + private boolean bidirectional; + + private final Queue<byte[]> dataQueue; + + private K attachment; + + public AbstractChannelContext(Connection connection) { + this.connection = connection; + this.dataQueue = new LinkedList<>(); + } + + @Override + public boolean isBidirectional() { + return bidirectional; + } + + @Override + public void setBidirectional(boolean bidirectional) { + this.bidirectional = bidirectional; + } + + @Override + public SocketAddress getLocalAddress() { + return connection.localAddress; + } + + @Override + public SocketAddress getRemoteAddress() { + return connection.remoteAddress; + } + + @Override + public SessionInfo getSessionInfo() { + return connection.sessionInfo; + } + + public Queue<byte[]> dataQueue() { + return dataQueue; + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + byte[] out = new byte[length]; + System.arraycopy(data, offset, out, 0, length); + dataQueue.add(out); + } + + @Override + public void write(ByteBuff data) throws IOException { + dataQueue.add(data.toArray()); + } + + public void flush() throws IOException { + synchronized (connection) { + connection.flag = Connection.FLAG_FLUSH; + connection.notify(); + } + } + + public void close() throws IOException { + synchronized (connection) { + connection.flag = Connection.FLAG_CLOSE; + connection.notify(); + } + } + + @Override + public K getAttachment() { + return attachment; + } + + public void setAttachment(K attachment) { + this.attachment = attachment; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Mon Jul 29 12:54:31 2019 +0200 @@ -0,0 +1,134 @@ +package com.passus.st.emitter.socket; + +import com.passus.net.SocketAddress; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.emitter.SessionMapper; +import com.passus.st.emitter.SessionMapper.ConnectionParams; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Queue; + +public abstract class Connection extends Thread { + + protected final Logger logger = LogManager.getLogger(getClass()); + + static final int FLAG_CONNECT = 1; + static final int FLAG_FLUSH = 2; + static final int FLAG_READ = 3; + static final int FLAG_CLOSE = 4; + + private boolean working = true; + + int flag = FLAG_CONNECT; + + final SessionInfo sessionInfo; + + final EmitterHandler handler; + + final SessionMapper sessionMapper; + + SocketAddress localAddress; + + SocketAddress remoteAddress; + + AbstractChannelContext channelContext; + + public Connection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper) { + this.sessionInfo = sessionInfo; + this.handler = handler; + this.sessionMapper = sessionMapper; + } + + protected final ConnectionParams getConnParams(SessionInfo sessionInfo, EmitterHandler handler) { + ConnectionParams connParams = sessionMapper.map(sessionInfo); + if (connParams == null) { + if (logger.isDebugEnabled()) { + logger.debug("Unable to map session '{}'.", sessionInfo); + } + + /* if (collectMetrics) { + synchronized (metric) { + metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID); + } + }*/ + + try { + handler.sessionInvalidated(sessionInfo); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + return null; + } + + if (logger.isDebugEnabled()) { + logger.debug("Registering session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); + } + + return connParams; + } + + protected void doCatchException(AbstractChannelContext channelContext, Throwable cause) { + if (logger.isDebugEnabled()) { + logger.debug("Error occurred. " + cause.getMessage(), cause); + } + + /*if (collectMetrics) { + synchronized (metric) { + metric.errorCaught(cause); + } + }*/ + + try { + handler.errorOccurred(channelContext, cause); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + } + + public abstract void connect(); + + public abstract void close(); + + private void write(byte[] data) { + write(data, 0, data.length); + } + + public abstract void write(byte[] data, int offset, int length); + + @Override + public void run() { + connect(); + while (working) { + synchronized (this) { + if (flag != 0) { + try { + wait(1); + } catch (InterruptedException ignore) { + } + } + + switch (flag) { + case FLAG_FLUSH: + Queue<byte[]> queue = channelContext.dataQueue(); + if (!queue.isEmpty()) { + byte[] data; + while ((data = queue.poll()) != null) { + write(data); + } + } + + break; + case FLAG_READ: + break; + case FLAG_CLOSE: + close(); + break; + + } + } + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramConnection.java Mon Jul 29 12:54:31 2019 +0200 @@ -0,0 +1,118 @@ +package com.passus.st.emitter.socket; + +import com.passus.net.SocketAddress; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.emitter.SessionMapper; +import com.passus.st.emitter.SessionMapper.ConnectionParams; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketException; + +import static com.passus.net.utils.AddressUtils.jdkSocketToSocketAddress; +import static com.passus.net.utils.AddressUtils.socketAddressToJdkSocket; +import static com.passus.st.emitter.SessionMapper.ANY_SOCKET; + +public class DatagramConnection extends Connection { + + private final Logger logger = LogManager.getLogger(DatagramConnection.class); + + private DatagramSocket socket; + + private java.net.SocketAddress remoteSocket; + + public DatagramConnection(SessionInfo sessionInfo, EmitterHandler handler, SessionMapper sessionMapper) { + super(sessionInfo, handler, sessionMapper); + } + + public void connect() { + ConnectionParams connParams = getConnParams(sessionInfo, handler); + if (connParams == null) { + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Registering UDP session '{}'. Mapped connection parameters '{}'.", sessionInfo, connParams); + } + + try { + SocketAddress bindAddress = connParams.getBindAddress(); + if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) { + socket = new DatagramSocket(socketAddressToJdkSocket(bindAddress)); + } else { + socket = new DatagramSocket(); + } + } catch (SocketException ex) { + doCatchException(channelContext, ex); + return; + } + + remoteAddress = connParams.getRemoteAddress(); + if (remoteAddress == null) { + remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); + } + + java.net.SocketAddress localSocketAddress = socket.getLocalSocketAddress(); + remoteSocket = socketAddressToJdkSocket(remoteAddress); + localAddress = jdkSocketToSocketAddress(localSocketAddress); + + channelContext = new DatagramSocketChannelContext(this); + try { + handler.channelRegistered(channelContext); + } catch (Exception ex) { + doCatchException(channelContext, ex); + } + + try { + socket.connect(remoteSocket); + handler.channelActive(channelContext); + } catch (Exception ex) { + doCatchException(channelContext, ex); + } + } + + public void close() { + try { + socket.disconnect(); + } catch (Exception ignore) { + + } + + try { + socket.close(); + } catch (Exception ignore) { + + } + } + + public void write(byte[] data, int offset, int length) { + try { + handler.dataWriteStart(channelContext); + if (logger.isDebugEnabled()) { + logger.debug("Writing ({} -> {}).", channelContext.getLocalAddress(), channelContext.getRemoteAddress()); + } + + DatagramPacket packet = new DatagramPacket(data, offset, data.length, remoteSocket); + socket.send(packet); + + if (logger.isDebugEnabled()) { + logger.debug("Written {}B ({} -> {})", length, + channelContext.getLocalAddress(), channelContext.getRemoteAddress()); + } + + try { + handler.dataWritten(channelContext); + logger.debug("Write handled."); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + } catch (Exception ex) { + doCatchException(channelContext, ex); + } + } + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/DatagramSocketChannelContext.java Mon Jul 29 12:54:31 2019 +0200 @@ -0,0 +1,20 @@ +package com.passus.st.emitter.socket; + +public class DatagramSocketChannelContext<K> extends AbstractChannelContext<K> { + + public DatagramSocketChannelContext(Connection connection) { + super(connection); + } + + @Override + public boolean isConnected() { + return true; + } + + @Override + public boolean isConnectionPending() { + return false; + } + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/SocketEmitter.java Mon Jul 29 12:54:31 2019 +0200 @@ -0,0 +1,125 @@ +package com.passus.st.emitter.socket; + +import com.passus.commons.Assert; +import com.passus.commons.annotations.Plugin; +import com.passus.net.session.Session; +import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.EmitterHandler; +import com.passus.st.emitter.SessionInfo; +import com.passus.st.emitter.SessionMapper; +import com.passus.st.metric.MetricsContainer; +import com.passus.st.plugin.PluginConstants; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +@Plugin(name = SocketEmitter.TYPE, category = PluginConstants.CATEGORY_EMITTER) +public class SocketEmitter implements Emitter { + + private static final Logger LOGGER = LogManager.getLogger(SocketEmitter.class); + + public static final String TYPE = "socket"; + + private static final int DEFAULT_NUM_THREADS = 4; + + private SessionMapper sessionMapper = Emitter.DEFAULT_SESSION_MAPPER; + + private static final long DEFAULT_CONNECTION_TIMEOUT = 5_000; + + private boolean collectMetrics = DEFAULT_COLLECT_METRICS; + + private Map<SessionInfo, Connection> connections = new HashMap<>(); + + private int maxThreads = DEFAULT_NUM_THREADS; + + private long connectionTimeout = DEFAULT_CONNECTION_TIMEOUT; + + private boolean started; + + @Override + public void setSessionMapper(SessionMapper sessionMapper) { + Assert.notNull(sessionMapper, "sessionMapper"); + this.sessionMapper = sessionMapper; + } + + @Override + public SessionMapper getSessionMapper() { + return sessionMapper; + } + + @Override + public boolean isCollectMetrics() { + return collectMetrics; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + this.collectMetrics = collectMetrics; + } + + @Override + public boolean isStarted() { + return started; + } + + @Override + public void start() { + synchronized (this) { + if (started) { + return; + } + + if (sessionMapper == null) { + sessionMapper = Emitter.DEFAULT_SESSION_MAPPER; + } + + started = true; + } + } + + @Override + public void stop() { + synchronized (this) { + if (!started) { + return; + } + + started = false; + } + } + + private void checkStarted() { + if (!started) { + throw new IllegalStateException("NioEmitter is not started."); + } + } + + + @Override + public void connect(SessionInfo sessionInfo, EmitterHandler handler, int workerIndex) throws IOException { + synchronized (this) { + checkStarted(); + if (connections.containsKey(sessionInfo)) { + throw new IOException("Connection established already."); + } + + if (sessionInfo.getTransport() == Session.PROTOCOL_TCP) { + throw new UnsupportedOperationException("Not implemented yet."); + } else if (sessionInfo.getTransport() == Session.PROTOCOL_UDP) { + DatagramConnection connection = new DatagramConnection(sessionInfo, handler, sessionMapper); + connection.start(); + connections.put(sessionInfo, connection); + } + } + } + + @Override + public void writeMetrics(MetricsContainer container) { + + } + + +}
--- a/stress-tester/src/test/java/com/passus/st/client/FlowExecutorTest.java Thu Jul 25 13:34:42 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/FlowExecutorTest.java Mon Jul 29 12:54:31 2019 +0200 @@ -2,6 +2,7 @@ import com.passus.commons.service.ServiceUtils; import com.passus.st.AbstractWireMockTest; +import com.passus.st.Log4jConfigurationFactory; import com.passus.st.emitter.RuleBasedSessionMapper; import com.passus.st.emitter.nio.NioEmitter; import com.passus.st.utils.EventUtils; @@ -38,6 +39,7 @@ @Test public void testHandle_HTTP() throws Exception { + Log4jConfigurationFactory.enableFactory("debug"); Map<String, Object> props = new HashMap<>(); props.put("allowPartialSession", true); props.put("ports", 4214);
--- a/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Thu Jul 25 13:34:42 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/SynchFlowWorkerTest.java Mon Jul 29 12:54:31 2019 +0200 @@ -131,6 +131,8 @@ protected boolean bidirectional = true; + private Object attachment; + public LocalChannelContext(LocalEmitter emitter, EmitterHandler handler, SocketAddress remoteAddress, SessionInfo sessionInfo) { this.emitter = emitter; this.handler = handler; @@ -198,6 +200,15 @@ return sessionInfo; } + @Override + public Object getAttachment() { + return attachment; + } + + @Override + public void setAttachment(Object attachment) { + this.attachment = attachment; + } } private SynchFlowWorker createWorker() {
--- a/stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java Thu Jul 25 13:34:42 2019 +0200 +++ b/stress-tester/src/test/java/com/passus/st/utils/SimpleDatagramServer.java Mon Jul 29 12:54:31 2019 +0200 @@ -10,6 +10,7 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; +import java.net.SocketException; import java.util.ArrayList; import java.util.List; @@ -75,7 +76,7 @@ } try { - DatagramSocket serverSocket = new DatagramSocket(port, InetAddress.getByName(host)); + serverSocket = new DatagramSocket(port, InetAddress.getByName(host)); LOGGER.debug("Datagram socket created."); serverThread = new ServerThread(serverSocket); serverThread.start(); @@ -91,11 +92,16 @@ return; } + try { + serverSocket.close(); + } catch (Exception e) { + e.printStackTrace(); + } + serverThread.working = false; - serverSocket.disconnect(); serverThread.interrupt(); try { - serverThread.join(5_000); + serverThread.join(); } catch (InterruptedException ignore) { } @@ -120,6 +126,10 @@ while (working) { DatagramPacket packet = new DatagramPacket(buffer, buffer.length); try { + if (serverSocket.isClosed()) { + break; + } + serverSocket.receive(packet); if (packet.getLength() > 0) { if (decoder != null) { @@ -142,6 +152,16 @@ } } } + } catch (SocketException e) { + if(e.getMessage().contains("socket closed")) { + break; + } + + if (listener != null) { + listener.onError(e); + } + + LOGGER.error(e.getMessage(), e); } catch (IOException e) { if (listener != null) { listener.onError(e); @@ -150,6 +170,8 @@ LOGGER.error(e.getMessage(), e); } } + + LOGGER.debug("Server thread stopped."); } }