Mercurial > stress-tester
changeset 1223:fedf5100fdd3
SessionPayloadEvent.raw, FlowProcessor - raw session payload support
author | Devel 2 |
---|---|
date | Thu, 25 Jun 2020 11:28:37 +0200 |
parents | f666342e4ad9 |
children | cff85e9bf7a8 |
files | stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java |
diffstat | 2 files changed, 39 insertions(+), 19 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Thu Jun 25 10:54:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowProcessor.java Thu Jun 25 11:28:37 2020 +0200 @@ -1,5 +1,6 @@ package com.passus.st.client; +import com.passus.commons.utils.FormatUtils; import com.passus.data.ByteBuff; import com.passus.data.DataDecoder; import com.passus.data.HeapByteBuff; @@ -336,26 +337,33 @@ return; } - ByteBuff buffer; - FlowHandler client = flowContext.client(); - FlowHandlerDataEncoder encoder = client.getRequestEncoder(flowContext); - buffer = flowContext.buffer(); - try { - encoder.encode(req, flowContext, buffer); + ByteBuff buffer = flowContext.buffer(); + if (event.isRaw()) { + byte[] data = (byte[]) event.getRequest(); + buffer.append(data); if (trace) { - logger.trace("Request encoded: " + req); + logger.trace("Raw data appended: " + FormatUtils.bytesToHexString(data, 0, 10)); } - } catch (Exception e) { - flowContext.encoderErrors++; - if (logger.isDebugEnabled()) { - debug(logger, flowContext, e.getMessage(), e); + } else { + FlowHandler client = flowContext.client(); + FlowHandlerDataEncoder encoder = client.getRequestEncoder(flowContext); + try { + encoder.encode(req, flowContext, buffer); + if (trace) { + logger.trace("Request encoded: " + req); + } + } catch (Exception e) { + flowContext.encoderErrors++; + if (logger.isDebugEnabled()) { + debug(logger, flowContext, e.getMessage(), e); + } + + if (flowContext.encoderErrors == maxEncoderErrors) { + error(flowContext, new FlowError(CODE_MAX_ENCODER_ERRORS_REACHED, "Max encoder errors reached.")); + } + + return; } - - if (flowContext.encoderErrors == maxEncoderErrors) { - error(flowContext, new FlowError(CODE_MAX_ENCODER_ERRORS_REACHED, "Max encoder errors reached.")); - } - - return; } if (collectMetric) { @@ -372,7 +380,6 @@ flowContext.writeEndTime = -1; flowContext.channelContext().writeAndFlush(buffer); requestSent0(flowContext, event); - buffer.clear(); if (wait) { if (flowContext.isBidirectional()) { @@ -390,6 +397,8 @@ if (flowContext.sendErrors == maxSendErrors) { error(flowContext, new FlowError(CODE_MAX_SEND_ERRORS_REACHED, "Max send errors reached.")); } + } finally { + buffer.clear(); } } } catch (Exception e) {
--- a/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java Thu Jun 25 10:54:28 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java Thu Jun 25 11:28:37 2020 +0200 @@ -5,9 +5,9 @@ import static com.passus.st.Protocols.protocolToString; /** - * @author Mirosław Hawrot * @param <R> * @param <S> + * @author Mirosław Hawrot */ public class SessionPayloadEvent<R, S> extends SessionEvent { @@ -19,11 +19,18 @@ private final int protocolId; + private final boolean raw; + public SessionPayloadEvent(SessionInfo sessionInfo, R request, S response, int protocolId, String sourceName) { + this(sessionInfo, request, response, protocolId, sourceName, false); + } + + public SessionPayloadEvent(SessionInfo sessionInfo, R request, S response, int protocolId, String sourceName, boolean raw) { super(sessionInfo, sourceName); this.request = request; this.response = response; this.protocolId = protocolId; + this.raw = raw; } @Override @@ -43,6 +50,10 @@ return response; } + public boolean isRaw() { + return raw; + } + @Override public SessionPayloadEvent instanceForWorker(int index) { return new SessionPayloadEvent(getSessionInfo(), request, response, protocolId, getSourceName());