Mercurial > stress-tester
changeset 963:3a4c9ca6593a
Refactorization in progress
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/ClientX.java Fri May 31 09:27:03 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,24 +0,0 @@ -package com.passus.st.client; - -import com.passus.config.Configurable; -import com.passus.st.metric.MetricSource; - -public interface ClientX extends MetricSource, Configurable { - - int getProtocolId(); - - void init(FlowContext flowContext); - - ClientXDataDecoder getResponseDecoder(FlowContext flowContext); - - ClientXDataEncoder getRequestEncoder(FlowContext flowContext); - - default void onDataWriteStart(FlowContext flowContext) { - - } - - default void onDataWriteEnd(FlowContext flowContext) { - - } - -}
--- a/stress-tester/src/main/java/com/passus/st/client/ClientXDataDecoder.java Fri May 31 09:27:03 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,19 +0,0 @@ -package com.passus.st.client; - -import com.passus.data.ByteBuff; - -public interface ClientXDataDecoder<T> { - - T getResult(); - - int state(); - - String getLastError(); - - default void clear(FlowContext flowContext) { - - } - - int decode(ByteBuff buffer, FlowContext flowContext); - -}
--- a/stress-tester/src/main/java/com/passus/st/client/ClientXDataEncoder.java Fri May 31 09:27:03 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,9 +0,0 @@ -package com.passus.st.client; - -import com.passus.data.ByteBuff; - -public interface ClientXDataEncoder<T> { - - void encode(T request, FlowContext flowContext, ByteBuff out); - -}
--- a/stress-tester/src/main/java/com/passus/st/client/ClientXFactory.java Fri May 31 09:27:03 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,7 +0,0 @@ -package com.passus.st.client; - -public interface ClientXFactory { - - ClientX create(int protocolId); - -}
--- a/stress-tester/src/main/java/com/passus/st/client/ClientXFactoryImpl.java Fri May 31 09:27:03 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,18 +0,0 @@ -package com.passus.st.client; - -import com.passus.st.client.http.HttpClientX; - -import static com.passus.st.Protocols.HTTP; - -public class ClientXFactoryImpl implements ClientXFactory { - - @Override - public ClientX create(int protocolId) { - switch (protocolId) { - case HTTP: - return new HttpClientX(); - } - - throw new IllegalArgumentException("Not supported protocol '" + protocolId + "'."); - } -}
--- a/stress-tester/src/main/java/com/passus/st/client/Flow.java Fri May 31 09:27:03 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/Flow.java Fri May 31 10:53:30 2019 +0200 @@ -9,9 +9,9 @@ public interface Flow extends EventHandler, MetricSource, Configurable { - ClientX getClient(); + FlowHandler getClient(); - void setClient(ClientX client); + void setClient(FlowHandler client); Emitter getEmitter();
--- a/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Fri May 31 09:27:03 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowContext.java Fri May 31 10:53:30 2019 +0200 @@ -41,7 +41,7 @@ private int loop; - private ClientX client; + private FlowHandler client; @Deprecated protected DataDecoder decoder; @@ -60,11 +60,11 @@ this.channelContext = channelContext; } - public ClientX client() { + public FlowHandler client() { return client; } - public void client(ClientX client) { + public void client(FlowHandler client) { this.client = client; }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandler.java Fri May 31 10:53:30 2019 +0200 @@ -0,0 +1,35 @@ +package com.passus.st.client; + +import com.passus.config.Configurable; +import com.passus.config.Configuration; +import com.passus.config.ConfigurationContext; +import com.passus.st.metric.MetricSource; + +/** + * TODO Dolecowo ma zastapic interfejs Client + */ +public interface FlowHandler extends MetricSource, Configurable { + + boolean DEFAULT_COLLECT_METRICS = true; + + int getProtocolId(); + + void init(FlowContext flowContext); + + FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext); + + FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext); + + default void onDataWriteStart(FlowContext flowContext) { + + } + + default void onDataWriteEnd(FlowContext flowContext) { + + } + + @Override + default void configure(Configuration config, ConfigurationContext context) { + setCollectMetrics(config.getBoolean("collectMetrics", DEFAULT_COLLECT_METRICS)); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerDataDecoder.java Fri May 31 10:53:30 2019 +0200 @@ -0,0 +1,19 @@ +package com.passus.st.client; + +import com.passus.data.ByteBuff; + +public interface FlowHandlerDataDecoder<T> { + + T getResult(); + + int state(); + + String getLastError(); + + default void clear(FlowContext flowContext) { + + } + + int decode(ByteBuff buffer, FlowContext flowContext); + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerDataEncoder.java Fri May 31 10:53:30 2019 +0200 @@ -0,0 +1,9 @@ +package com.passus.st.client; + +import com.passus.data.ByteBuff; + +public interface FlowHandlerDataEncoder<T> { + + void encode(T request, FlowContext flowContext, ByteBuff out); + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactory.java Fri May 31 10:53:30 2019 +0200 @@ -0,0 +1,7 @@ +package com.passus.st.client; + +public interface FlowHandlerFactory { + + FlowHandler create(int protocolId); + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowHandlerFactoryImpl.java Fri May 31 10:53:30 2019 +0200 @@ -0,0 +1,18 @@ +package com.passus.st.client; + +import com.passus.st.client.http.HttpFlowHandler; + +import static com.passus.st.Protocols.HTTP; + +public class FlowHandlerFactoryImpl implements FlowHandlerFactory { + + @Override + public FlowHandler create(int protocolId) { + switch (protocolId) { + case HTTP: + return new HttpFlowHandler(); + } + + throw new IllegalArgumentException("Not supported protocol '" + protocolId + "'."); + } +}
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Fri May 31 09:27:03 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorker.java Fri May 31 10:53:30 2019 +0200 @@ -25,7 +25,7 @@ protected final Emitter emitter; - protected ClientXFactory clientFactory = new ClientXFactoryImpl(); + protected FlowHandlerFactory clientFactory = new FlowHandlerFactoryImpl(); protected boolean collectMetric; @@ -65,11 +65,11 @@ this.filterChain = filterChain; } - public ClientXFactory getClientFactory() { + public FlowHandlerFactory getClientFactory() { return clientFactory; } - public void setClientFactory(ClientXFactory clientFactory) { + public void setClientFactory(FlowHandlerFactory clientFactory) { Assert.notNull(clientFactory, "clientFactory"); this.clientFactory = clientFactory; }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java Fri May 31 09:27:03 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBased.java Fri May 31 10:53:30 2019 +0200 @@ -194,7 +194,7 @@ FlowContext flowContext = createFlowContext(session); //TODO Malo optymalne - ClientX client = clientFactory.create(session.getProtocolId()); + FlowHandler client = clientFactory.create(session.getProtocolId()); client.init(flowContext); flowContext.client(client); sessions.put(session, flowContext); @@ -381,8 +381,8 @@ FlowContext flowContext = flowContext(context); try { if (flowContext != null) { - ClientX client = flowContext.client(); - ClientXDataDecoder decoder = client.getResponseDecoder(flowContext); + FlowHandler client = flowContext.client(); + FlowHandlerDataDecoder decoder = client.getResponseDecoder(flowContext); decoder.decode(data, flowContext); long now = timeGenerator.currentTimeMillis(); if (flowContext.receivedStartTimestamp() == -1) { @@ -481,8 +481,8 @@ return false; } - ClientX client = flowContext.client(); - ClientXDataEncoder encoder = client.getRequestEncoder(flowContext); + FlowHandler client = flowContext.client(); + FlowHandlerDataEncoder encoder = client.getRequestEncoder(flowContext); ByteBuff buffer = flowContext.buffer(); encoder.encode(req, flowContext, buffer);
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientX.java Fri May 31 09:27:03 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,97 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.commons.time.TimeAware; -import com.passus.commons.time.TimeGenerator; -import com.passus.config.Configuration; -import com.passus.config.ConfigurationContext; -import com.passus.net.http.HttpRequest; -import com.passus.st.client.ClientX; -import com.passus.st.client.ClientXDataDecoder; -import com.passus.st.client.ClientXDataEncoder; -import com.passus.st.client.FlowContext; -import com.passus.st.metric.MetricsContainer; - -import static com.passus.st.Protocols.HTTP; -import static com.passus.st.client.http.HttpConsts.TAG_TIME_END; -import static com.passus.st.client.http.HttpConsts.TAG_TIME_START; - -public class HttpClientX implements ClientX, TimeAware { - - private final HttpClientXDataDecoder decoder; - - private final HttpClientXDataEncoder encoder; - - TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); - - boolean collectMetrics = false; - - HttpClientWorkerMetric metric; - - public HttpClientX() { - decoder = new HttpClientXDataDecoder(this); - encoder = new HttpClientXDataEncoder(); - } - - @Override - public int getProtocolId() { - return HTTP; - } - - @Override - public ClientXDataDecoder getResponseDecoder(FlowContext flowContext) { - return decoder; - } - - @Override - public ClientXDataEncoder getRequestEncoder(FlowContext flowContext) { - return encoder; - } - - @Override - public void init(FlowContext flowContext) { - //TODO Poprawic, w HttpScopes sa parametry globalne - flowContext.setParam(HttpFlowConst.PARAM_HTTP_CONTEXT, new HttpFlowContext(flowContext, new HttpScopes())); - } - - @Override - public TimeGenerator getTimeGenerator() { - return timeGenerator; - } - - @Override - public void setTimeGenerator(TimeGenerator timeGenerator) { - this.timeGenerator = timeGenerator; - } - - @Override - public void configure(Configuration config, ConfigurationContext context) { - - } - - @Override - public boolean isCollectMetrics() { - return collectMetrics; - } - - @Override - public void setCollectMetrics(boolean collectMetrics) { - this.collectMetrics = collectMetrics; - } - - @Override - public void writeMetrics(MetricsContainer container) { - - } - - @Override - public void onDataWriteStart(FlowContext flowContext) { - long now = timeGenerator.currentTimeMillis(); - ((HttpRequest) flowContext.sentEvent().getRequest()).setTag(TAG_TIME_START, now); - } - - @Override - public void onDataWriteEnd(FlowContext flowContext) { - long now = timeGenerator.currentTimeMillis(); - ((HttpRequest) (flowContext.sentEvent().getRequest())).setTag(TAG_TIME_END, now); - } -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientXDataDecoder.java Fri May 31 09:27:03 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,102 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.data.ByteBuff; -import com.passus.data.DataDecoder; -import com.passus.net.http.HttpFullMessageDecoder; -import com.passus.net.http.HttpRequest; -import com.passus.net.http.HttpResponse; -import com.passus.st.client.ClientXDataDecoder; -import com.passus.st.client.FlowContext; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import static com.passus.st.client.FlowUtils.debug; -import static com.passus.st.client.http.HttpConsts.*; -import static com.passus.st.client.http.filter.HttpFlowUtils.extractHttpContext; - -public class HttpClientXDataDecoder implements ClientXDataDecoder<HttpResponse> { - - private static final Logger LOGGER = LogManager.getLogger(HttpClientXDataDecoder.class); - - private final HttpFullMessageDecoder decoder; - - private final HttpClientX client; - - private HttpRequest lastRequest; - - public HttpClientXDataDecoder(HttpClientX client) { - this.client = client; - decoder = new HttpFullMessageDecoder(); - decoder.setDecodeRequest(false); - } - - @Override - public HttpResponse getResult() { - return (HttpResponse) decoder.getResult(); - } - - @Override - public int state() { - return decoder.state(); - } - - @Override - public String getLastError() { - return decoder.getLastError(); - } - - @Override - public void clear(FlowContext flowContext) { - if (lastRequest != null) { - extractHttpContext(flowContext).scopes().removeConversation(lastRequest); - lastRequest = null; - } - - decoder.clear(); - } - - @Override - public int decode(ByteBuff buffer, FlowContext flowContext) { - if (flowContext.sentEvent() != null) { - lastRequest = (HttpRequest) flowContext.sentEvent().getRequest(); - } - - if (lastRequest != null) { - decoder.setRequestMethod(lastRequest.getMethod()); - } - - int res = decoder.decode(buffer); - if (decoder.state() == DataDecoder.STATE_FINISHED) { - long now = client.timeGenerator.currentTimeMillis(); - HttpResponse resp = (HttpResponse) decoder.getResult(); - - if (LOGGER.isDebugEnabled()) { - debug(LOGGER, flowContext, - "Response decoded (size: {} B, downloaded: {} ms, status: {})", - decoder.getHeaderSize() + decoder.getContentSize(), - now - flowContext.receivedStartTimestamp(), - resp.getStatus().getCode() - ); - } - - if (client.isCollectMetrics()) { - synchronized (client.metric) { - client.metric.incResponsesNum(); - client.metric.addResponseStatusCode(resp.getStatus().getCode()); - client.metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize()); - client.metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp()); - if (lastRequest != null) { - client.metric.addResponseTime(now - (long) lastRequest.getTag(TAG_TIME_START)); - } - } - } - - resp.setTag(TAG_HEADER_SIZE, decoder.getHeaderSize()); - resp.setTag(TAG_CONTENT_SIZE, decoder.getContentSize()); - resp.setTag(TAG_TIME_START, flowContext.receivedStartTimestamp()); - resp.setTag(TAG_TIME_END, now); - } - - return res; - } -}
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientXDataEncoder.java Fri May 31 09:27:03 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,26 +0,0 @@ -package com.passus.st.client.http; - -import com.passus.data.ByteBuff; -import com.passus.net.http.HttpRequest; -import com.passus.net.http.HttpRequestEncoder; -import com.passus.st.client.ClientXDataEncoder; -import com.passus.st.client.FlowContext; - -import static com.passus.st.client.http.HttpConsts.TAG_CONTENT_SIZE; -import static com.passus.st.client.http.HttpConsts.TAG_HEADER_SIZE; - -public class HttpClientXDataEncoder implements ClientXDataEncoder<HttpRequest> { - - private final HttpRequestEncoder reqEncoder = new HttpRequestEncoder(); - - @Override - public void encode(HttpRequest request, FlowContext flowContext, ByteBuff out) { - reqEncoder.encodeHeaders(request, out); - long headerSize = out.readableBytes(); - reqEncoder.encodeContent(request, out); - - request.setTag(TAG_HEADER_SIZE, headerSize); - request.setTag(TAG_CONTENT_SIZE, flowContext.buffer().readableBytes() - headerSize); - } - -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandler.java Fri May 31 10:53:30 2019 +0200 @@ -0,0 +1,90 @@ +package com.passus.st.client.http; + +import com.passus.commons.time.TimeAware; +import com.passus.commons.time.TimeGenerator; +import com.passus.net.http.HttpRequest; +import com.passus.st.client.FlowHandler; +import com.passus.st.client.FlowHandlerDataDecoder; +import com.passus.st.client.FlowHandlerDataEncoder; +import com.passus.st.client.FlowContext; +import com.passus.st.metric.MetricsContainer; + +import static com.passus.st.Protocols.HTTP; +import static com.passus.st.client.http.HttpConsts.TAG_TIME_END; +import static com.passus.st.client.http.HttpConsts.TAG_TIME_START; + +public class HttpFlowHandler implements FlowHandler, TimeAware { + + private final HttpFlowHandlerDataDecoder decoder; + + private final HttpFlowHandlerDataEncoder encoder; + + TimeGenerator timeGenerator = TimeGenerator.getDefaultGenerator(); + + boolean collectMetrics = false; + + HttpClientWorkerMetric metric; + + public HttpFlowHandler() { + decoder = new HttpFlowHandlerDataDecoder(this); + encoder = new HttpFlowHandlerDataEncoder(); + } + + @Override + public int getProtocolId() { + return HTTP; + } + + @Override + public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) { + return decoder; + } + + @Override + public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) { + return encoder; + } + + @Override + public void init(FlowContext flowContext) { + //TODO Poprawic, w HttpScopes sa parametry globalne + flowContext.setParam(HttpFlowConst.PARAM_HTTP_CONTEXT, new HttpFlowContext(flowContext, new HttpScopes())); + } + + @Override + public TimeGenerator getTimeGenerator() { + return timeGenerator; + } + + @Override + public void setTimeGenerator(TimeGenerator timeGenerator) { + this.timeGenerator = timeGenerator; + } + + @Override + public boolean isCollectMetrics() { + return collectMetrics; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + this.collectMetrics = collectMetrics; + } + + @Override + public void writeMetrics(MetricsContainer container) { + + } + + @Override + public void onDataWriteStart(FlowContext flowContext) { + long now = timeGenerator.currentTimeMillis(); + ((HttpRequest) flowContext.sentEvent().getRequest()).setTag(TAG_TIME_START, now); + } + + @Override + public void onDataWriteEnd(FlowContext flowContext) { + long now = timeGenerator.currentTimeMillis(); + ((HttpRequest) (flowContext.sentEvent().getRequest())).setTag(TAG_TIME_END, now); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandlerDataDecoder.java Fri May 31 10:53:30 2019 +0200 @@ -0,0 +1,102 @@ +package com.passus.st.client.http; + +import com.passus.data.ByteBuff; +import com.passus.data.DataDecoder; +import com.passus.net.http.HttpFullMessageDecoder; +import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpResponse; +import com.passus.st.client.FlowHandlerDataDecoder; +import com.passus.st.client.FlowContext; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import static com.passus.st.client.FlowUtils.debug; +import static com.passus.st.client.http.HttpConsts.*; +import static com.passus.st.client.http.filter.HttpFlowUtils.extractHttpContext; + +public class HttpFlowHandlerDataDecoder implements FlowHandlerDataDecoder<HttpResponse> { + + private static final Logger LOGGER = LogManager.getLogger(HttpFlowHandlerDataDecoder.class); + + private final HttpFullMessageDecoder decoder; + + private final HttpFlowHandler client; + + private HttpRequest lastRequest; + + public HttpFlowHandlerDataDecoder(HttpFlowHandler client) { + this.client = client; + decoder = new HttpFullMessageDecoder(); + decoder.setDecodeRequest(false); + } + + @Override + public HttpResponse getResult() { + return (HttpResponse) decoder.getResult(); + } + + @Override + public int state() { + return decoder.state(); + } + + @Override + public String getLastError() { + return decoder.getLastError(); + } + + @Override + public void clear(FlowContext flowContext) { + if (lastRequest != null) { + extractHttpContext(flowContext).scopes().removeConversation(lastRequest); + lastRequest = null; + } + + decoder.clear(); + } + + @Override + public int decode(ByteBuff buffer, FlowContext flowContext) { + if (flowContext.sentEvent() != null) { + lastRequest = (HttpRequest) flowContext.sentEvent().getRequest(); + } + + if (lastRequest != null) { + decoder.setRequestMethod(lastRequest.getMethod()); + } + + int res = decoder.decode(buffer); + if (decoder.state() == DataDecoder.STATE_FINISHED) { + long now = client.timeGenerator.currentTimeMillis(); + HttpResponse resp = (HttpResponse) decoder.getResult(); + + if (LOGGER.isDebugEnabled()) { + debug(LOGGER, flowContext, + "Response decoded (size: {} B, downloaded: {} ms, status: {})", + decoder.getHeaderSize() + decoder.getContentSize(), + now - flowContext.receivedStartTimestamp(), + resp.getStatus().getCode() + ); + } + + if (client.isCollectMetrics()) { + synchronized (client.metric) { + client.metric.incResponsesNum(); + client.metric.addResponseStatusCode(resp.getStatus().getCode()); + client.metric.addResponseSize(decoder.getHeaderSize() + decoder.getContentSize()); + client.metric.addResponseReceivingTime(now - flowContext.receivedStartTimestamp()); + if (lastRequest != null) { + client.metric.addResponseTime(now - (long) lastRequest.getTag(TAG_TIME_START)); + } + } + } + + resp.setTag(TAG_HEADER_SIZE, decoder.getHeaderSize()); + resp.setTag(TAG_CONTENT_SIZE, decoder.getContentSize()); + resp.setTag(TAG_TIME_START, flowContext.receivedStartTimestamp()); + resp.setTag(TAG_TIME_END, now); + } + + return res; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowHandlerDataEncoder.java Fri May 31 10:53:30 2019 +0200 @@ -0,0 +1,26 @@ +package com.passus.st.client.http; + +import com.passus.data.ByteBuff; +import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpRequestEncoder; +import com.passus.st.client.FlowHandlerDataEncoder; +import com.passus.st.client.FlowContext; + +import static com.passus.st.client.http.HttpConsts.TAG_CONTENT_SIZE; +import static com.passus.st.client.http.HttpConsts.TAG_HEADER_SIZE; + +public class HttpFlowHandlerDataEncoder implements FlowHandlerDataEncoder<HttpRequest> { + + private final HttpRequestEncoder reqEncoder = new HttpRequestEncoder(); + + @Override + public void encode(HttpRequest request, FlowContext flowContext, ByteBuff out) { + reqEncoder.encodeHeaders(request, out); + long headerSize = out.readableBytes(); + reqEncoder.encodeContent(request, out); + + request.setTag(TAG_HEADER_SIZE, headerSize); + request.setTag(TAG_CONTENT_SIZE, flowContext.buffer().readableBytes() - headerSize); + } + +}