changeset 1234:e0985c791eae

EmitterHandler.dataReceived signature changed
author Devel 2
date Mon, 29 Jun 2020 14:12:03 +0200
parents 3cbe1f2437b0
children 5dc2ff622d6c
files stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/client/Timeouts.java stress-tester/src/main/java/com/passus/st/emitter/EmitterHandler.java stress-tester/src/main/java/com/passus/st/emitter/Task.java stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java stress-tester/src/test/java/com/passus/st/client/TestClientHandler.java
diffstat 7 files changed, 49 insertions(+), 25 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Mon Jun 29 11:25:32 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Mon Jun 29 14:12:03 2020 +0200
@@ -419,14 +419,14 @@
     }
 
     @Override
-    public void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
+    public boolean dataReceived(ChannelContext context, ByteBuff data) throws Exception {
         FlowContext flowContext = context.getFlowContext();
         flowContext.lock();
         try {
             try {
                 FlowHandler client = flowContext.client();
                 FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext);
-                decoder.decode(data, flowContext);
+                int res = decoder.decode(data, flowContext);
 
                 long now = System.currentTimeMillis();
                 if (flowContext.receivedStartTimestamp() == -1) {
@@ -446,6 +446,7 @@
 
                     responseReceived0(flowContext, null, true);
                     decoder.clear(flowContext);
+                    throw new Exception("Decoder error. " + decoder.getLastError());
                 } else if (decoder.state() == DataDecoder.STATE_FINISHED) {
                     if (collectMetric) {
                         synchronized (metric) {
@@ -474,7 +475,10 @@
                     boolean blocked = filterChain.filterInbound(req, resp, flowContext) == Filter.DENY;
                     responseReceived0(flowContext, resp, blocked);
                     decoder.clear(flowContext);
+                    return true;
                 }
+
+                return false;
             } catch (Exception e) {
                 if (collectMetric) {
                     synchronized (metric) {
@@ -487,6 +491,7 @@
                 }
 
                 error(flowContext, FlowError.unknownError());
+                throw e;
             }
         } finally {
             flowContext.signalAndUnlock();
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Mon Jun 29 11:25:32 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Mon Jun 29 14:12:03 2020 +0200
@@ -489,14 +489,14 @@
     }
 
     @Override
-    public void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
+    public boolean dataReceived(ChannelContext context, ByteBuff data) throws Exception {
         FlowContext flowContext = context.getFlowContext();
         flowContext.lock();
         try {
             try {
                 FlowHandler client = flowContext.client();
                 FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext);
-                decoder.decode(data, flowContext);
+                int res = decoder.decode(data, flowContext);
 
                 long now = timeGenerator.currentTimeMillis();
                 if (flowContext.receivedStartTimestamp() == -1) {
@@ -516,6 +516,7 @@
 
                     decoder.clear(flowContext);
                     responseReceived0(flowContext, null);
+                    throw new Exception("Decoder error. " + decoder.getLastError());
                 } else if (decoder.state() == DataDecoder.STATE_FINISHED) {
                     if (collectMetrics) {
                         synchronized (metric) {
@@ -540,7 +541,10 @@
 
                     decoder.clear(flowContext);
                     responseReceived0(flowContext, resp);
+                    return true;
                 }
+
+                return false;
             } catch (Exception e) {
                 if (collectMetrics) {
                     synchronized (metric) {
@@ -553,6 +557,7 @@
                 }
 
                 error(flowContext, FlowError.unknownError());
+                throw e;
             }
         } finally {
             flowContext.signalAndUnlock();
@@ -726,7 +731,6 @@
     }
 
 
-
     protected void processTimeouts() {
         try {
             long now = timeGenerator.currentTimeMillis();
--- a/stress-tester/src/main/java/com/passus/st/client/Timeouts.java	Mon Jun 29 11:25:32 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/Timeouts.java	Mon Jun 29 14:12:03 2020 +0200
@@ -52,4 +52,14 @@
     public void setDisconnectTimeout(long disconnectTimeout) {
         this.disconnectTimeout = disconnectTimeout;
     }
+
+    @Override
+    public String toString() {
+        return "Timeouts{" +
+                "defaultTimeout=" + defaultTimeout +
+                ", connectionTimeout=" + connectionTimeout +
+                ", readTimeout=" + readTimeout +
+                ", disconnectTimeout=" + disconnectTimeout +
+                '}';
+    }
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/EmitterHandler.java	Mon Jun 29 11:25:32 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/EmitterHandler.java	Mon Jun 29 14:12:03 2020 +0200
@@ -19,7 +19,8 @@
     default void channelInactive(ChannelContext context) throws Exception {
     }
 
-    default void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
+    default boolean dataReceived(ChannelContext context, ByteBuff data) throws Exception {
+        return false;
     }
 
     default void dataWriteStart(ChannelContext context) {
--- a/stress-tester/src/main/java/com/passus/st/emitter/Task.java	Mon Jun 29 11:25:32 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/Task.java	Mon Jun 29 14:12:03 2020 +0200
@@ -29,4 +29,12 @@
                 throw new Error("Invalid task code.");
         }
     }
+
+    @Override
+    public String toString() {
+        return "Task{" +
+                "code=" + codeString() +
+                '}';
+    }
+
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Mon Jun 29 11:25:32 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/socket/Connection.java	Mon Jun 29 14:12:03 2020 +0200
@@ -102,31 +102,26 @@
     private void doRead() {
         buffer.clear();
 
-        int totalRead = 0;
         try {
             int read;
             while ((read = read(buffer)) != -1) {
-                totalRead += read;
+                if (read > 0) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Read {}B ({} -> {})", read,
+                                channelContext.getLocalAddress(), channelContext.getRemoteAddress());
+                    }
+                    try {
+                        if (handler.dataReceived(channelContext, buffer)) {
+                            break;
+                        }
+                    } catch (Exception e) {
+                        logger.debug(e.getMessage(), e);
+                    }
+                }
             }
-
         } catch (IOException e) {
             doCatchException(channelContext, e);
             doClose();
-            return;
-        }
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Read {}B ({} -> {})", totalRead,
-                    channelContext.getLocalAddress(), channelContext.getRemoteAddress());
-        }
-
-        if (buffer.readableBytes() > 0) {
-            try {
-                handler.dataReceived(channelContext, buffer);
-                logger.debug("Read handled.");
-            } catch (Exception e) {
-                logger.debug(e.getMessage(), e);
-            }
         }
     }
 
--- a/stress-tester/src/test/java/com/passus/st/client/TestClientHandler.java	Mon Jun 29 11:25:32 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/client/TestClientHandler.java	Mon Jun 29 14:12:03 2020 +0200
@@ -152,12 +152,13 @@
     }
 
     @Override
-    public final void dataReceived(ChannelContext context, ByteBuff data) throws Exception {
+    public final boolean dataReceived(ChannelContext context, ByteBuff data) throws Exception {
         synchronized (this) {
             ClientEvent event = ClientEvent.create(EventType.DATA_RECEIVED, context);
             event.setData(data);
             add(event);
             doDataReceived(context, data);
+            return false;
         }
     }