Mercurial > stress-tester
changeset 1234:e0985c791eae
EmitterHandler.dataReceived signature changed
author | Devel 2 |
---|---|
date | Mon, 29 Jun 2020 14:12:03 +0200 |
parents | 3cbe1f2437b0 |
children | 5dc2ff622d6c |
files | stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/client/Timeouts.java stress-tester/src/main/java/com/passus/st/emitter/EmitterHandler.java stress-tester/src/main/java/com/passus/st/emitter/Task.java stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/test/java/com/passus/st/client/TestClientHandler.java |
diffstat | 7 files changed, 49 insertions(+), 25 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Mon Jun 29 11:25:32 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Mon Jun 29 14:12:03 2020 +0200 @@ -419,14 +419,14 @@ } @Override - public void dataReceived(ChannelContext context, ByteBuff data) throws Exception { + public boolean dataReceived(ChannelContext context, ByteBuff data) throws Exception { FlowContext flowContext = context.getFlowContext(); flowContext.lock(); try { try { FlowHandler client = flowContext.client(); FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext); - decoder.decode(data, flowContext); + int res = decoder.decode(data, flowContext); long now = System.currentTimeMillis(); if (flowContext.receivedStartTimestamp() == -1) { @@ -446,6 +446,7 @@ responseReceived0(flowContext, null, true); decoder.clear(flowContext); + throw new Exception("Decoder error. " + decoder.getLastError()); } else if (decoder.state() == DataDecoder.STATE_FINISHED) { if (collectMetric) { synchronized (metric) { @@ -474,7 +475,10 @@ boolean blocked = filterChain.filterInbound(req, resp, flowContext) == Filter.DENY; responseReceived0(flowContext, resp, blocked); decoder.clear(flowContext); + return true; } + + return false; } catch (Exception e) { if (collectMetric) { synchronized (metric) { @@ -487,6 +491,7 @@ } error(flowContext, FlowError.unknownError()); + throw e; } } finally { flowContext.signalAndUnlock();
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Mon Jun 29 11:25:32 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Mon Jun 29 14:12:03 2020 +0200 @@ -489,14 +489,14 @@ } @Override - public void dataReceived(ChannelContext context, ByteBuff data) throws Exception { + public boolean dataReceived(ChannelContext context, ByteBuff data) throws Exception { FlowContext flowContext = context.getFlowContext(); flowContext.lock(); try { try { FlowHandler client = flowContext.client(); FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext); - decoder.decode(data, flowContext); + int res = decoder.decode(data, flowContext); long now = timeGenerator.currentTimeMillis(); if (flowContext.receivedStartTimestamp() == -1) { @@ -516,6 +516,7 @@ decoder.clear(flowContext); responseReceived0(flowContext, null); + throw new Exception("Decoder error. " + decoder.getLastError()); } else if (decoder.state() == DataDecoder.STATE_FINISHED) { if (collectMetrics) { synchronized (metric) { @@ -540,7 +541,10 @@ decoder.clear(flowContext); responseReceived0(flowContext, resp); + return true; } + + return false; } catch (Exception e) { if (collectMetrics) { synchronized (metric) { @@ -553,6 +557,7 @@ } error(flowContext, FlowError.unknownError()); + throw e; } } finally { flowContext.signalAndUnlock(); @@ -726,7 +731,6 @@ } - protected void processTimeouts() { try { long now = timeGenerator.currentTimeMillis();
--- a/stress-tester/src/main/java/com/passus/st/client/Timeouts.java Mon Jun 29 11:25:32 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/Timeouts.java Mon Jun 29 14:12:03 2020 +0200 @@ -52,4 +52,14 @@ public void setDisconnectTimeout(long disconnectTimeout) { this.disconnectTimeout = disconnectTimeout; } + + @Override + public String toString() { + return "Timeouts{" + + "defaultTimeout=" + defaultTimeout + + ", connectionTimeout=" + connectionTimeout + + ", readTimeout=" + readTimeout + + ", disconnectTimeout=" + disconnectTimeout + + '}'; + } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/EmitterHandler.java Mon Jun 29 11:25:32 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/EmitterHandler.java Mon Jun 29 14:12:03 2020 +0200 @@ -19,7 +19,8 @@ default void channelInactive(ChannelContext context) throws Exception { } - default void dataReceived(ChannelContext context, ByteBuff data) throws Exception { + default boolean dataReceived(ChannelContext context, ByteBuff data) throws Exception { + return false; } default void dataWriteStart(ChannelContext context) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/Task.java Mon Jun 29 11:25:32 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/Task.java Mon Jun 29 14:12:03 2020 +0200 @@ -29,4 +29,12 @@ throw new Error("Invalid task code."); } } + + @Override + public String toString() { + return "Task{" + + "code=" + codeString() + + '}'; + } + }
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Mon Jun 29 11:25:32 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java Mon Jun 29 14:12:03 2020 +0200 @@ -102,31 +102,26 @@ private void doRead() { buffer.clear(); - int totalRead = 0; try { int read; while ((read = read(buffer)) != -1) { - totalRead += read; + if (read > 0) { + if (logger.isDebugEnabled()) { + logger.debug("Read {}B ({} -> {})", read, + channelContext.getLocalAddress(), channelContext.getRemoteAddress()); + } + try { + if (handler.dataReceived(channelContext, buffer)) { + break; + } + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + } } - } catch (IOException e) { doCatchException(channelContext, e); doClose(); - return; - } - - if (logger.isDebugEnabled()) { - logger.debug("Read {}B ({} -> {})", totalRead, - channelContext.getLocalAddress(), channelContext.getRemoteAddress()); - } - - if (buffer.readableBytes() > 0) { - try { - handler.dataReceived(channelContext, buffer); - logger.debug("Read handled."); - } catch (Exception e) { - logger.debug(e.getMessage(), e); - } } }
--- a/stress-tester/src/test/java/com/passus/st/client/TestClientHandler.java Mon Jun 29 11:25:32 2020 +0200 +++ b/stress-tester/src/test/java/com/passus/st/client/TestClientHandler.java Mon Jun 29 14:12:03 2020 +0200 @@ -152,12 +152,13 @@ } @Override - public final void dataReceived(ChannelContext context, ByteBuff data) throws Exception { + public final boolean dataReceived(ChannelContext context, ByteBuff data) throws Exception { synchronized (this) { ClientEvent event = ClientEvent.create(EventType.DATA_RECEIVED, context); event.setData(data); add(event); doDataReceived(context, data); + return false; } }