Mercurial > stress-tester
changeset 573:1fb90938c654
HttpSynchClientWorker invalid session bugfix
author | Devel 2 |
---|---|
date | Wed, 27 Sep 2017 15:47:49 +0200 |
parents | b9420fb384f2 |
children | f7c2e8285d7c |
files | stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java stress-tester/src/main/java/com/passus/st/utils/NetExceptionsCategory.java |
diffstat | 4 files changed, 141 insertions(+), 114 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java Wed Sep 27 14:03:07 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSynchClientWorker.java Wed Sep 27 15:47:49 2017 +0200 @@ -8,6 +8,7 @@ import com.passus.st.client.SessionEvent; import com.passus.st.client.SessionStatusEvent; import com.passus.st.emitter.Emitter; +import com.passus.st.emitter.SessionInfo; import com.passus.st.plugin.PluginConstants; import java.util.concurrent.LinkedBlockingDeque; @@ -50,6 +51,20 @@ } @Override + public void sessionInvalidated(SessionInfo session) throws Exception { + synchronized (lock) { + if (logger.isDebugEnabled()) { + logger.debug("Session {} invalidated.", session); + } + + HttpFlowContext flowContext = flowContext(session); + if (flowContext != null) { + changeFlowState(flowContext, HttpFlowContext.STATE_DISCONNECTING); + } + } + } + + @Override protected void flowStateChanged(HttpFlowContext context, int oldState) { if (logger.isDebugEnabled()) { logger.debug("flowStateChanged {},{}", context == currFlowContext, context.stateString());
--- a/stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java Wed Sep 27 14:03:07 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/EmitterMetric.java Wed Sep 27 15:47:49 2017 +0200 @@ -123,6 +123,10 @@ public void errorCaught(Throwable cause) { String category = NetExceptionsCategory.errorToCategory(cause); + incErrorByCategory(category); + } + + public void incErrorByCategory(String category) { MutableInt count = errors.get(category); if (count == null) { count = new MutableInt(1); @@ -131,7 +135,7 @@ count.increment(); } } - + @Override public boolean isActive() { return active;
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java Wed Sep 27 14:03:07 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java Wed Sep 27 15:47:49 2017 +0200 @@ -13,6 +13,7 @@ import com.passus.st.emitter.SessionInfo; import com.passus.st.metric.MetricSource; import com.passus.st.metric.MetricsContainer; +import static com.passus.st.utils.NetExceptionsCategory.BIND_MAPPER_SESSION_INVALID; import java.io.IOException; import java.net.ConnectException; import java.nio.ByteBuffer; @@ -30,61 +31,61 @@ * @author Mirosław Hawrot */ public class NioEmitterWorker extends Thread implements MetricSource { - + private static final Logger LOGGER = LogManager.getLogger(NioEmitterWorker.class); - + private int index; - + private final Selector selector; - + private volatile boolean working = false; - + private long selectTimeout = 100; - + private Queue<Task> tasks = new ConcurrentLinkedQueue<>(); - + private SessionMapper sessionMapper; - + private volatile boolean collectMetrics = false; - + private long connectionTimeout = 5_000; - + private EmitterMetric metric; - + NioEmitterWorker(int index) throws IOException { super("NioEmitterWorker-" + index); selector = Selector.open(); } - + int getIndex() { return index; } - + public long getSelectTimeout() { return selectTimeout; } - + public void setSelectTimeout(long selectTimeout) { Assert.greaterThanZero(selectTimeout, "selectTimeout"); this.selectTimeout = selectTimeout; } - + public long getConnectionTimeout() { return connectionTimeout; } - + public void setConnectionTimeout(long connectionTimeout) { this.connectionTimeout = connectionTimeout; } - + public SessionMapper getSessionMapper() { return sessionMapper; } - + public void setSessionMapper(SessionMapper sessionMapper) { this.sessionMapper = sessionMapper; } - + public void setWorking(boolean working) { this.working = working; if (this.working && !working) { @@ -95,12 +96,12 @@ } } } - + @Override public boolean isCollectMetrics() { return collectMetrics; } - + @Override public void setCollectMetrics(boolean collectMetrics) { if (collectMetrics && metric == null) { @@ -122,15 +123,15 @@ LOGGER.debug("Error occured during metric deactivation. " + e.getMessage(), e); } } - + metric = null; } } - + public boolean isWorking() { return working; } - + @Override public void writeMetrics(MetricsContainer container) { if (collectMetrics) { @@ -138,17 +139,17 @@ metric.reset(); } } - + public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException { tasks.add(new ConnectTask(sessionInfo, handler)); selector.wakeup(); } - + void flush(SelectionKey key) { tasks.add(new FlushTask(key)); selector.wakeup(); } - + private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) { try { ConnectionParams connParams = sessionMapper.map(sessionInfo); @@ -156,33 +157,39 @@ if (LOGGER.isDebugEnabled()) { LOGGER.debug("Unable to map session '" + sessionInfo + "'."); } - + + if (collectMetrics) { + synchronized (metric) { + metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID); + } + } + try { handler.sessionInvalidated(sessionInfo); } catch (Exception e) { LOGGER.debug(e.getMessage(), e); } - + return; } - + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Registering session '" + sessionInfo + "'. Mapped connection parameters '" + connParams + "'."); } - + SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); - + SocketAddress bindAddress = connParams.getBindAddress(); if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) { channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress)); } - + SocketAddress remoteAddress = connParams.getRemoteAddress(); if (remoteAddress == null) { remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); } - + NioChannelContext channelContext = new NioChannelContext(this, channel, remoteAddress, sessionInfo); KeyContext keyContext = new KeyContext(channelContext, handler); SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext); @@ -191,7 +198,7 @@ } catch (Exception ex) { doCatchException(key, ex); } - + channelContext.selectionKey(key); try { channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress)); @@ -199,7 +206,7 @@ doCatchException(key, ex); return; } - + selector.wakeup(); } catch (Exception e) { if (collectMetrics) { @@ -207,17 +214,17 @@ } LOGGER.error(e.getMessage(), e); } - + } - + private void doFinishConnect(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); KeyContext keyContext = (KeyContext) key.attachment(); - + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connecting to '" + keyContext.channelContext.getRemoteAddress() + "'."); } - + try { long connStart = System.currentTimeMillis(); boolean timeouted = false; @@ -229,14 +236,14 @@ } } key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); - + if (timeouted) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress()); } - + throw new ConnectException("Connection timed out."); - + } } catch (Exception e) { doCatchException(key, e); @@ -244,27 +251,27 @@ if (collectMetrics) { metric.incConnectionsErrors(); } - + return; } - + try { if (collectMetrics) { metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress()); metric.addBindSocket(keyContext.channelContext.getLocalAddress()); } - + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress()); } - + keyContext.handler.channelActive(keyContext.channelContext); setOpRead(key); } catch (Exception ex) { LOGGER.error(ex.getMessage(), ex); } } - + private void doWrite(SelectionKey key) { SocketChannel socketChannel = (SocketChannel) key.channel(); KeyContext keyContext = (KeyContext) key.attachment(); @@ -272,7 +279,7 @@ LOGGER.debug("Writing ({} -> {}).", keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); } - + keyContext.handler.dataWriteStart(keyContext.channelContext); Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue(); int written = 0; @@ -282,16 +289,16 @@ buffer = queue.poll(); while (buffer.hasRemaining()) { int res = socketChannel.write(buffer); - + if (res == -1) { doClose(key); return; } - + if (collectMetrics) { metric.updateSentBytes(res); } - + written += res; } } @@ -300,7 +307,7 @@ doClose(key); return; } - + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Written {}B ({} -> {})", written, keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); @@ -313,24 +320,24 @@ } catch (Exception e) { LOGGER.debug(e.getMessage(), e); } - + setOpRead(key); clearOpWrite(key); } - + private void doRead(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); KeyContext keyContext = (KeyContext) key.attachment(); - + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Reading ({} -> {})", keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); } - + ByteBuffer buffer = keyContext.buffer; - + buffer.clear(); - + ByteBuff buff = new HeapByteBuff(); int totalReaded = 0; int readed; @@ -340,7 +347,7 @@ buff.append(buffer.array(), buffer.position(), buffer.limit()); buffer.clear(); totalReaded += readed; - + if (collectMetrics) { metric.updateReceivedBytes(readed); } @@ -350,12 +357,12 @@ doClose(key); return; } - + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Readed {}B ({} -> {})", totalReaded, keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); } - + if (totalReaded > 0) { try { keyContext.handler.dataReceived(keyContext.channelContext, buff); @@ -364,117 +371,117 @@ LOGGER.debug(e.getMessage(), e); } } - + if (readed == -1) { doClose(key); return; } - + keyContext.buffer.flip(); } - + private void doCatchException(SelectionKey key, Throwable cause) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Error occured. " + cause.getMessage(), cause); } - + if (collectMetrics) { synchronized (metric) { metric.errorCaught(cause); } } - + KeyContext keyContext = (KeyContext) key.attachment(); - + try { keyContext.handler.errorOccured(keyContext.channelContext, cause); } catch (Exception e) { LOGGER.debug(e.getMessage(), e); } } - + private void doClose(SelectionKey key) { if (!key.channel().isOpen()) { selector.wakeup(); return; } - + try { key.channel().close(); } catch (Exception e) { LOGGER.debug(e.getMessage(), e); } - + KeyContext keyContext = (KeyContext) key.attachment(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'."); } - + try { keyContext.handler.channelInactive(keyContext.channelContext); } catch (Exception e) { LOGGER.debug(e.getMessage(), e); } - + key.cancel(); try { keyContext.handler.channelUnregistered(keyContext.channelContext); } catch (Exception e) { LOGGER.debug(e.getMessage(), e); } - + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'."); } - + if (collectMetrics) { metric.incClosedConnections(); } - + selector.wakeup(); } - + static void setOpRead(SelectionKey key) { if (!key.isValid() || key.isReadable()) { return; } - + key.interestOps(key.interestOps() | SelectionKey.OP_READ); key.selector().wakeup(); } - + static void clearOpRead(SelectionKey key) { if (!key.isValid() || !key.isReadable()) { return; } - + key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); key.selector().wakeup(); } - + static void setOpWrite(SelectionKey key) { if (!key.isValid() || key.isWritable()) { return; } - + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); key.selector().wakeup(); } - + static void clearOpWrite(SelectionKey key) { if (!key.isValid() || !key.isWritable()) { return; } - + key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); key.selector().wakeup(); } - + void requestClose(SelectionKey key) { tasks.add(new CloseTask(key)); key.selector().wakeup(); } - + @Override public void run() { int selected = 0; @@ -494,23 +501,23 @@ } } } - + try { selected = selector.select(selectTimeout); } catch (IOException ex) { LOGGER.warn(ex.getMessage(), ex); } - + if (selected > 0) { Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); - + if (!key.isValid()) { continue; } - + if (key.isConnectable()) { doFinishConnect(key); } else if (key.isWritable()) { @@ -522,68 +529,68 @@ } } } - + private static class KeyContext { - + private final EmitterHandler handler; - + private final NioChannelContext channelContext; - + private ByteBuffer buffer = ByteBuffer.allocate(1024); - + public KeyContext(NioChannelContext channelContext, EmitterHandler handler) { this.channelContext = channelContext; this.handler = handler; } - + } - + private static abstract class Task { - + public static final int CLOSE = 1; public static final int CONNECT = 2; public static final int FLUSH = 3; - + private final int code; - + public Task(int code) { this.code = code; } - + } - + private final static class ConnectTask extends Task { - + private final SessionInfo sessionInfo; private final EmitterHandler handler; - + public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) { super(CONNECT); this.sessionInfo = sessionInfo; this.handler = handler; } - + } - + private final static class CloseTask extends Task { - + private final SelectionKey key; - + public CloseTask(SelectionKey key) { super(CLOSE); this.key = key; } - + } - + private final static class FlushTask extends Task { - + private final SelectionKey key; - + public FlushTask(SelectionKey key) { super(FLUSH); this.key = key; } - + } }
--- a/stress-tester/src/main/java/com/passus/st/utils/NetExceptionsCategory.java Wed Sep 27 14:03:07 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/utils/NetExceptionsCategory.java Wed Sep 27 15:47:49 2017 +0200 @@ -16,6 +16,7 @@ public static final String CONNECTION_NO_ROUTE_TO_HOST = "connection.no_route_to_host"; public static final String CONNECTION_UNKNWON = "connection.unknwon"; + public static final String BIND_MAPPER_SESSION_INVALID = "bind.mapper_session_invalid"; public static final String BIND_NETWORK_UNREACHABLE = "bind.network_unreachable"; public static final String BIND_ADDRESS_ALREADY_IN_USE = "bind.address_already_in_use"; public static final String BIND_UNKNWON = "bind.unknown";