Mercurial > stress-tester
changeset 605:5c0b5bfd2972
HttpDebugFilter
author | Devel 1 |
---|---|
date | Mon, 09 Oct 2017 09:07:31 +0200 |
parents | 4cbc75e90f7e |
children | 2e185909d22c |
files | stress-tester/src/main/java/com/passus/st/client/http/HttpMessageWriter.java stress-tester/src/main/java/com/passus/st/client/http/filter/HttpDebugFilter.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java |
diffstat | 3 files changed, 176 insertions(+), 114 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpMessageWriter.java Fri Oct 06 10:42:04 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpMessageWriter.java Mon Oct 09 09:07:31 2017 +0200 @@ -49,6 +49,9 @@ public void writeSilently(HttpMessage message, File file) { try { + if (file.exists()) { + LOGGER.warn("File '{}' already exists.", file.getAbsolutePath()); + } write(message, file); } catch (IOException ex) { LOGGER.debug("Could not save file '{}'.", file.getAbsolutePath());
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/filter/HttpDebugFilter.java Mon Oct 09 09:07:31 2017 +0200 @@ -0,0 +1,59 @@ +package com.passus.st.client.http.filter; + +import com.passus.commons.annotations.Plugin; +import com.passus.config.annotations.NodeDefinitionCreate; +import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef; +import com.passus.config.schema.NodeDefinition; +import com.passus.config.schema.NodeDefinitionCreator; +import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpResponse; +import com.passus.st.client.http.HttpFlowContext; +import com.passus.st.plugin.PluginConstants; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * + * @author mikolaj.podbielski + */ +@Plugin(name = HttpDebugFilter.TYPE, category = PluginConstants.CATEGORY_HTTP_FILTER) +@NodeDefinitionCreate(HttpDebugFilter.EmptyDefCreator.class) +public class HttpDebugFilter extends HttpFilter { + + private static final Logger LOGGER = LogManager.getLogger(HttpDebugFilter.class); + + public static final String TYPE = "debug"; + + @Override + public int filterOutbound(HttpRequest req, HttpResponse resp, HttpFlowContext context) { + LOGGER.warn("OUT {} {} @{}", info(req), info(resp), System.identityHashCode(context)); + return DUNNO; + } + + @Override + public int filterInbound(HttpRequest req, HttpResponse resp, HttpFlowContext context) { + LOGGER.warn("IN {} {} @{}", info(req), info(resp), System.identityHashCode(context)); + return DUNNO; + } + + @Override + public HttpDebugFilter instanceForWorker(int index) { + return new HttpDebugFilter(); + } + + public static String info(HttpRequest req) { + return String.format("[%s @%d %s]", req.getId(), System.identityHashCode(req), req.getUrl().toString()); + } + + private Object info(HttpResponse resp) { + return String.format("[%d @%s]", resp.getStatus().getCode(), System.identityHashCode(resp)); + } + + public static class EmptyDefCreator implements NodeDefinitionCreator { + + @Override + public NodeDefinition create() { + return mapDef(); + } + } +}
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java Fri Oct 06 10:42:04 2017 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioEmitterWorker.java Mon Oct 09 09:07:31 2017 +0200 @@ -31,61 +31,61 @@ * @author Mirosław Hawrot */ public class NioEmitterWorker extends Thread implements MetricSource { - + private static final Logger LOGGER = LogManager.getLogger(NioEmitterWorker.class); - + private int index; - + private final Selector selector; - + private volatile boolean working = false; - + private long selectTimeout = 100; - + private Queue<Task> tasks = new ConcurrentLinkedQueue<>(); - + private SessionMapper sessionMapper; - + private volatile boolean collectMetrics = false; - + private long connectionTimeout = 5_000; - + private EmitterMetric metric; - + NioEmitterWorker(int index) throws IOException { super("NioEmitterWorker-" + index); selector = Selector.open(); } - + int getIndex() { return index; } - + public long getSelectTimeout() { return selectTimeout; } - + public void setSelectTimeout(long selectTimeout) { Assert.greaterThanZero(selectTimeout, "selectTimeout"); this.selectTimeout = selectTimeout; } - + public long getConnectionTimeout() { return connectionTimeout; } - + public void setConnectionTimeout(long connectionTimeout) { this.connectionTimeout = connectionTimeout; } - + public SessionMapper getSessionMapper() { return sessionMapper; } - + public void setSessionMapper(SessionMapper sessionMapper) { this.sessionMapper = sessionMapper; } - + public void setWorking(boolean working) { this.working = working; if (this.working && !working) { @@ -96,12 +96,12 @@ } } } - + @Override public boolean isCollectMetrics() { return collectMetrics; } - + @Override public void setCollectMetrics(boolean collectMetrics) { if (collectMetrics && metric == null) { @@ -123,15 +123,15 @@ LOGGER.debug("Error occured during metric deactivation. " + e.getMessage(), e); } } - + metric = null; } } - + public boolean isWorking() { return working; } - + @Override public void writeMetrics(MetricsContainer container) { if (collectMetrics) { @@ -139,17 +139,17 @@ metric.reset(); } } - + public void connect(SessionInfo sessionInfo, EmitterHandler handler) throws IOException { tasks.add(new ConnectTask(sessionInfo, handler)); selector.wakeup(); } - + void flush(SelectionKey key) { tasks.add(new FlushTask(key)); selector.wakeup(); } - + private void doConnect(SessionInfo sessionInfo, EmitterHandler handler) { try { ConnectionParams connParams = sessionMapper.map(sessionInfo); @@ -157,39 +157,39 @@ if (LOGGER.isDebugEnabled()) { LOGGER.debug("Unable to map session '" + sessionInfo + "'."); } - + if (collectMetrics) { synchronized (metric) { metric.incErrorByCategory(BIND_MAPPER_SESSION_INVALID); } } - + try { handler.sessionInvalidated(sessionInfo); } catch (Exception e) { LOGGER.debug(e.getMessage(), e); } - + return; } - + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Registering session '" + sessionInfo + "'. Mapped connection parameters '" + connParams + "'."); } - + SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); - + SocketAddress bindAddress = connParams.getBindAddress(); if (bindAddress != null && ANY_SOCKET.equals(bindAddress)) { channel.bind(AddressUtils.socketAddressToJdkSocket(bindAddress)); } - + SocketAddress remoteAddress = connParams.getRemoteAddress(); if (remoteAddress == null) { remoteAddress = new SocketAddress(sessionInfo.getDstIp(), sessionInfo.getDstPort()); } - + NioChannelContext channelContext = new NioChannelContext(this, channel, remoteAddress, sessionInfo); KeyContext keyContext = new KeyContext(channelContext, handler); SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT, keyContext); @@ -198,7 +198,7 @@ } catch (Exception ex) { doCatchException(key, ex); } - + channelContext.selectionKey(key); try { channel.connect(AddressUtils.socketAddressToJdkSocket(remoteAddress)); @@ -206,7 +206,7 @@ doCatchException(key, ex); return; } - + selector.wakeup(); } catch (Exception e) { if (collectMetrics) { @@ -214,17 +214,17 @@ } LOGGER.error(e.getMessage(), e); } - + } - + private void doFinishConnect(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); KeyContext keyContext = (KeyContext) key.attachment(); - + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connecting to '" + keyContext.channelContext.getRemoteAddress() + "'."); } - + try { long connStart = System.currentTimeMillis(); boolean timeouted = false; @@ -236,14 +236,14 @@ } } key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); - + if (timeouted) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connection to '{}' timed out.", keyContext.channelContext.getRemoteAddress()); } - + throw new ConnectException("Connection timed out."); - + } } catch (Exception e) { doCatchException(key, e); @@ -251,27 +251,27 @@ if (collectMetrics) { metric.incConnectionsErrors(); } - + return; } - + try { if (collectMetrics) { metric.addRemoteSocketConnection(keyContext.channelContext.getRemoteAddress()); metric.addBindSocket(keyContext.channelContext.getLocalAddress()); } - + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Connected to '{}'.", keyContext.channelContext.getRemoteAddress()); } - + keyContext.handler.channelActive(keyContext.channelContext); setOpRead(key); } catch (Exception ex) { LOGGER.error(ex.getMessage(), ex); } } - + private void doWrite(SelectionKey key) { SocketChannel socketChannel = (SocketChannel) key.channel(); KeyContext keyContext = (KeyContext) key.attachment(); @@ -279,7 +279,7 @@ LOGGER.debug("Writing ({} -> {}).", keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); } - + keyContext.handler.dataWriteStart(keyContext.channelContext); Queue<ByteBuffer> queue = keyContext.channelContext.dataQueue(); int written = 0; @@ -289,16 +289,16 @@ buffer = queue.poll(); while (buffer.hasRemaining()) { int res = socketChannel.write(buffer); - + if (res == -1) { doClose(key); return; } - + if (collectMetrics) { metric.updateSentBytes(res); } - + written += res; } } @@ -307,7 +307,7 @@ doClose(key); return; } - + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Written {}B ({} -> {})", written, keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); @@ -320,24 +320,24 @@ } catch (Exception e) { LOGGER.debug(e.getMessage(), e); } - + setOpRead(key); clearOpWrite(key); } - + private void doRead(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); KeyContext keyContext = (KeyContext) key.attachment(); - + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Reading ({} -> {})", keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); } - + ByteBuffer buffer = keyContext.buffer; - + buffer.clear(); - + ByteBuff buff = new HeapByteBuff(); int totalReaded = 0; int readed; @@ -347,7 +347,7 @@ buff.append(buffer.array(), buffer.position(), buffer.limit()); buffer.clear(); totalReaded += readed; - + if (collectMetrics) { metric.updateReceivedBytes(readed); } @@ -357,12 +357,12 @@ doClose(key); return; } - + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Readed {}B ({} -> {})", totalReaded, keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); } - + if (totalReaded > 0) { try { keyContext.handler.dataReceived(keyContext.channelContext, buff); @@ -371,117 +371,117 @@ LOGGER.debug(e.getMessage(), e); } } - + if (readed == -1) { doClose(key); return; } - + keyContext.buffer.flip(); } - + private void doCatchException(SelectionKey key, Throwable cause) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Error occured. " + cause.getMessage(), cause); } - + if (collectMetrics) { synchronized (metric) { metric.errorCaught(cause); } } - + KeyContext keyContext = (KeyContext) key.attachment(); - + try { keyContext.handler.errorOccured(keyContext.channelContext, cause); } catch (Exception e) { LOGGER.debug(e.getMessage(), e); } } - + private void doClose(SelectionKey key) { if (!key.channel().isOpen()) { selector.wakeup(); return; } - + try { key.channel().close(); } catch (Exception e) { LOGGER.debug(e.getMessage(), e); } - + KeyContext keyContext = (KeyContext) key.attachment(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Closing session '" + keyContext.channelContext.getSessionInfo() + "'."); } - + try { keyContext.handler.channelInactive(keyContext.channelContext); } catch (Exception e) { LOGGER.debug(e.getMessage(), e); } - + key.cancel(); try { keyContext.handler.channelUnregistered(keyContext.channelContext); } catch (Exception e) { LOGGER.debug(e.getMessage(), e); } - + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Closed session '" + keyContext.channelContext.getSessionInfo() + "'."); } - + if (collectMetrics) { metric.incClosedConnections(); } - + selector.wakeup(); } - + static void setOpRead(SelectionKey key) { if (!key.isValid() || key.isReadable()) { return; } - + key.interestOps(key.interestOps() | SelectionKey.OP_READ); key.selector().wakeup(); } - + static void clearOpRead(SelectionKey key) { if (!key.isValid() || !key.isReadable()) { return; } - + key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); key.selector().wakeup(); } - + static void setOpWrite(SelectionKey key) { if (!key.isValid() || key.isWritable()) { return; } - + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); key.selector().wakeup(); } - + static void clearOpWrite(SelectionKey key) { if (!key.isValid() || !key.isWritable()) { return; } - + key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); key.selector().wakeup(); } - + void requestClose(SelectionKey key) { tasks.add(new CloseTask(key)); key.selector().wakeup(); } - + @Override public void run() { int selected = 0; @@ -501,23 +501,23 @@ } } } - + try { selected = selector.select(selectTimeout); } catch (IOException ex) { LOGGER.warn(ex.getMessage(), ex); } - + if (selected > 0) { Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); - + if (!key.isValid()) { continue; } - + if (key.isConnectable()) { doFinishConnect(key); } else if (key.isWritable()) { @@ -529,68 +529,68 @@ } } } - + private static class KeyContext { - + private final EmitterHandler handler; - + private final NioChannelContext channelContext; - + private ByteBuffer buffer = ByteBuffer.allocate(1024); - + public KeyContext(NioChannelContext channelContext, EmitterHandler handler) { this.channelContext = channelContext; this.handler = handler; } - + } - + private static abstract class Task { - + public static final int CLOSE = 1; public static final int CONNECT = 2; public static final int FLUSH = 3; - + private final int code; - + public Task(int code) { this.code = code; } - + } - + private final static class ConnectTask extends Task { - + private final SessionInfo sessionInfo; private final EmitterHandler handler; - + public ConnectTask(SessionInfo sessionInfo, EmitterHandler handler) { super(CONNECT); this.sessionInfo = sessionInfo; this.handler = handler; } - + } - + private final static class CloseTask extends Task { - + private final SelectionKey key; - + public CloseTask(SelectionKey key) { super(CLOSE); this.key = key; } - + } - + private final static class FlushTask extends Task { - + private final SelectionKey key; - + public FlushTask(SelectionKey key) { super(FLUSH); this.key = key; } - + } }