changeset 1223:fedf5100fdd3

SessionPayloadEvent.raw, FlowProcessor - raw session payload support
author Devel 2
date Thu, 25 Jun 2020 11:28:37 +0200
parents f666342e4ad9
children cff85e9bf7a8
files stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java
diffstat 2 files changed, 39 insertions(+), 19 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Thu Jun 25 10:54:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java	Thu Jun 25 11:28:37 2020 +0200
@@ -1,5 +1,6 @@
 package com.passus.st.client;
 
+import com.passus.commons.utils.FormatUtils;
 import com.passus.data.ByteBuff;
 import com.passus.data.DataDecoder;
 import com.passus.data.HeapByteBuff;
@@ -336,26 +337,33 @@
                     return;
                 }
 
-                ByteBuff buffer;
-                FlowHandler client = flowContext.client();
-                FlowHandlerDataEncoder encoder = client.getRequestEncoder(flowContext);
-                buffer = flowContext.buffer();
-                try {
-                    encoder.encode(req, flowContext, buffer);
+                ByteBuff buffer = flowContext.buffer();
+                if (event.isRaw()) {
+                    byte[] data = (byte[]) event.getRequest();
+                    buffer.append(data);
                     if (trace) {
-                        logger.trace("Request encoded: " + req);
+                        logger.trace("Raw data appended: " + FormatUtils.bytesToHexString(data, 0, 10));
                     }
-                } catch (Exception e) {
-                    flowContext.encoderErrors++;
-                    if (logger.isDebugEnabled()) {
-                        debug(logger, flowContext, e.getMessage(), e);
+                } else {
+                    FlowHandler client = flowContext.client();
+                    FlowHandlerDataEncoder encoder = client.getRequestEncoder(flowContext);
+                    try {
+                        encoder.encode(req, flowContext, buffer);
+                        if (trace) {
+                            logger.trace("Request encoded: " + req);
+                        }
+                    } catch (Exception e) {
+                        flowContext.encoderErrors++;
+                        if (logger.isDebugEnabled()) {
+                            debug(logger, flowContext, e.getMessage(), e);
+                        }
+
+                        if (flowContext.encoderErrors == maxEncoderErrors) {
+                            error(flowContext, new FlowError(CODE_MAX_ENCODER_ERRORS_REACHED, "Max encoder errors reached."));
+                        }
+
+                        return;
                     }
-
-                    if (flowContext.encoderErrors == maxEncoderErrors) {
-                        error(flowContext, new FlowError(CODE_MAX_ENCODER_ERRORS_REACHED, "Max encoder errors reached."));
-                    }
-
-                    return;
                 }
 
                 if (collectMetric) {
@@ -372,7 +380,6 @@
                     flowContext.writeEndTime = -1;
                     flowContext.channelContext().writeAndFlush(buffer);
                     requestSent0(flowContext, event);
-                    buffer.clear();
 
                     if (wait) {
                         if (flowContext.isBidirectional()) {
@@ -390,6 +397,8 @@
                     if (flowContext.sendErrors == maxSendErrors) {
                         error(flowContext, new FlowError(CODE_MAX_SEND_ERRORS_REACHED, "Max send errors reached."));
                     }
+                } finally {
+                    buffer.clear();
                 }
             }
         } catch (Exception e) {
--- a/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java	Thu Jun 25 10:54:28 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java	Thu Jun 25 11:28:37 2020 +0200
@@ -5,9 +5,9 @@
 import static com.passus.st.Protocols.protocolToString;
 
 /**
- * @author Mirosław Hawrot
  * @param <R>
  * @param <S>
+ * @author Mirosław Hawrot
  */
 public class SessionPayloadEvent<R, S> extends SessionEvent {
 
@@ -19,11 +19,18 @@
 
     private final int protocolId;
 
+    private final boolean raw;
+
     public SessionPayloadEvent(SessionInfo sessionInfo, R request, S response, int protocolId, String sourceName) {
+        this(sessionInfo, request, response, protocolId, sourceName, false);
+    }
+
+    public SessionPayloadEvent(SessionInfo sessionInfo, R request, S response, int protocolId, String sourceName, boolean raw) {
         super(sessionInfo, sourceName);
         this.request = request;
         this.response = response;
         this.protocolId = protocolId;
+        this.raw = raw;
     }
 
     @Override
@@ -43,6 +50,10 @@
         return response;
     }
 
+    public boolean isRaw() {
+        return raw;
+    }
+
     @Override
     public SessionPayloadEvent instanceForWorker(int index) {
         return new SessionPayloadEvent(getSessionInfo(), request, response, protocolId, getSourceName());