Mercurial > stress-tester
changeset 1117:7e874d9af91a
HTTP2 - decoder, encoder in progress
author | Devel 2 |
---|---|
date | Tue, 02 Jun 2020 11:35:56 +0200 |
parents | f8522f874d0f |
children | a9d9c6e9bf19 |
files | stress-tester/src/main/java/com/passus/st/client/http/Http2FlowContext.java stress-tester/src/main/java/com/passus/st/client/http/Http2FlowHandler.java stress-tester/src/main/java/com/passus/st/client/http/Http2FlowHandlerDataDecoder.java stress-tester/src/main/java/com/passus/st/client/http/Http2FlowHandlerDataEncoder.java stress-tester/src/main/java/com/passus/st/client/http/Http2Stream.java |
diffstat | 5 files changed, 320 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/Http2FlowContext.java Tue Jun 02 11:35:56 2020 +0200 @@ -0,0 +1,23 @@ +package com.passus.st.client.http; + +import com.passus.net.http2.Http2Settings; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; + +import static com.passus.net.http2.Http2Utils.SETTINGS_DEFAULT_WINDOW_SIZE; + +public class Http2FlowContext { + + final Int2ObjectOpenHashMap<Http2Stream> streams = new Int2ObjectOpenHashMap<>(); + + Http2Settings localSettings = new Http2Settings(); + + Http2Settings remoteSettings = new Http2Settings(); + + boolean remoteSettingsAckNeeded = false; + + int localWindowSize = SETTINGS_DEFAULT_WINDOW_SIZE; + + int remoteWindowSize = SETTINGS_DEFAULT_WINDOW_SIZE; + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/Http2FlowHandler.java Tue Jun 02 11:35:56 2020 +0200 @@ -0,0 +1,54 @@ +package com.passus.st.client.http; + +import com.passus.commons.time.TimeAware; +import com.passus.commons.time.TimeGenerator; +import com.passus.st.client.FlowContext; +import com.passus.st.client.FlowHandler; +import com.passus.st.client.FlowHandlerDataDecoder; +import com.passus.st.client.FlowHandlerDataEncoder; +import com.passus.st.metric.MetricsContainer; + +public class Http2FlowHandler implements FlowHandler, TimeAware { + + boolean collectMetrics = false; + + @Override + public TimeGenerator getTimeGenerator() { + return null; + } + + @Override + public void setTimeGenerator(TimeGenerator generator) { + + } + + @Override + public int getProtocolId() { + return 0; + } + + @Override + public FlowHandlerDataDecoder getResponseDecoder(FlowContext flowContext) { + return null; + } + + @Override + public FlowHandlerDataEncoder getRequestEncoder(FlowContext flowContext) { + return null; + } + + @Override + public boolean isCollectMetrics() { + return false; + } + + @Override + public void setCollectMetrics(boolean collectMetrics) { + + } + + @Override + public void writeMetrics(MetricsContainer container) { + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/Http2FlowHandlerDataDecoder.java Tue Jun 02 11:35:56 2020 +0200 @@ -0,0 +1,197 @@ +package com.passus.st.client.http; + +import com.passus.data.*; +import com.passus.net.FixedSizeLengthPduX; +import com.passus.net.FixedSizeLengthPduXHandler; +import com.passus.net.http.*; +import com.passus.net.http2.*; +import com.passus.st.client.FlowContext; +import com.passus.st.client.FlowHandlerDataDecoder; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +import static com.passus.data.DataDecoder.STATE_DATA_NEEDED; +import static com.passus.data.DataDecoder.STATE_FINISHED; +import static com.passus.net.http2.Http2Utils.*; + +public class Http2FlowHandlerDataDecoder implements FlowHandlerDataDecoder<HttpResponse> { + + private final Logger LOGGER = LogManager.getLogger(Http2FlowHandlerDataDecoder.class); + + private final Http2FrameDecoder frameDecoder = new Http2FrameDecoder(); + + private HttpResponse resp; + + private final Http2FlowContext context; + + private int state = STATE_DATA_NEEDED; + + private final FixedSizeLengthPduX pdu; + + private List<Http2Frame> streamFrames; + + public Http2FlowHandlerDataDecoder(Http2FlowContext context) { + this.context = context; + pdu = new FixedSizeLengthPduX(new PduHandler(), 3, 6); + } + + @Override + public HttpResponse getResult() { + return resp; + } + + @Override + public int state() { + return 0; + } + + @Override + public String getLastError() { + return null; + } + + @Override + public int decode(ByteBuff buffer, FlowContext flowContext) { + int res = frameDecoder.decode(buffer); + if (frameDecoder.state() == STATE_FINISHED) { + + } + + return res; + } + + private class PduHandler implements FixedSizeLengthPduXHandler { + + @Override + public void decodeBlockLength(byte[] data, int offset, MutableInt value) { + value.setValue(DataUtils.getInt3(data, offset)); + } + + private void processFrame(Http2Frame frame) { + + + switch (frame.getType()) { + case FRAME_WINDOW_UPDATE: + Http2WindowUpdateFrame windowUpdateFrame = (Http2WindowUpdateFrame) frame; + if (frame.getStreamId() == 0) { + context.remoteWindowSize = windowUpdateFrame.getWindowSizeIncrement(); + } else { + Http2Stream stream = context.streams.get(frame.getStreamId()); + if (stream == null) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Unable to update remote window size. Unknown stream '" + frame.getStreamId() + "'"); + } + } else { + stream.remoteWindowSize = windowUpdateFrame.getWindowSizeIncrement(); + } + } + + break; + case Http2Utils.FRAME_SETTINGS: + Http2SettingsFrame settingsFrame = (Http2SettingsFrame) frame; + if (settingsFrame.getStreamId() == 0) { + if (settingsFrame.getSettings() != null) { + context.remoteSettings = settingsFrame.getSettings(); + context.remoteSettingsAckNeeded = true; + } + } else if (LOGGER.isDebugEnabled()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Http2SettingsFrame.streamId != 0"); + } + } + break; + default: + processOther(frame); + } + } + + private void processOther(Http2Frame frame) { + int streamId = frame.getStreamId(); + if (streamId == 0) { + return; + } + + Http2Stream stream = context.streams.get(streamId); + if (stream == null) { + long maxConcurrentStreams = context.remoteSettings.maxConcurrentStreams(); + if (maxConcurrentStreams != Http2Settings.UNAVAILABLE + && context.streams.size() >= maxConcurrentStreams) { + LOGGER.debug("Max concurrent streams " + maxConcurrentStreams + " exceeded."); + return; + } + + stream = new Http2Stream(); + stream.remoteWindowSize = context.remoteSettings.initialWindowSize(); + context.streams.put(streamId, stream); + } + stream.frames.add(frame); + + if ((frame.getFlags() & FLAG_END_STREAM) == FLAG_END_STREAM) { + context.streams.remove(streamId); + resp = assemble(stream.frames); + streamFrames = stream.frames; + } + } + + HttpResponse assemble(List<Http2Frame> frames) { + HttpHeadersImpl headers = new HttpHeadersImpl(); + List<Http2DataFrame> dataFrames = null; + int contentLength = 0; + for (Http2Frame frame : frames) { + if (frame.getType() == FRAME_HEADERS) { + Http2HeadersFrame headersFrame = (Http2HeadersFrame) frame; + headers.addAll(headersFrame.getHeaders()); + } else if (frame.getType() == FRAME_DATA) { + if (dataFrames == null) { + dataFrames = new ArrayList<>(); + } + Http2DataFrame dataFrame = (Http2DataFrame) frame; + dataFrames.add(dataFrame); + byte[] data = dataFrame.getData(); + contentLength += data.length; + } + } + + HttpResponse resp; + ByteString statusStr = headers.get(PSEUDO_HEADER_STATUS); + int statusCode = ByteStringUtils.parseInt(statusStr); + HttpStatus status = HttpStatus.valueOf(statusCode); + + resp = new HttpResponse(headers); + resp.setStatus(status); + + if (dataFrames != null) { + int contentOffset = 0; + byte[] content = new byte[contentLength]; + for (Http2DataFrame dataFrame : dataFrames) { + byte[] data = dataFrame.getData(); + System.arraycopy(data, 0, content, contentOffset, data.length); + contentOffset += data.length; + } + resp.setContent(new ByteBuffDataSource(content, false)); + } + + state = STATE_FINISHED; + return resp; + } + + @Override + public void onNewPdu(byte[] data, int offset, int length) { + frameDecoder.decode(data, offset, length); + if (frameDecoder.state() == DataDecoder.STATE_ERROR) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("HTTP2 Frame decoder error. " + frameDecoder.getLastError()); + } + + frameDecoder.clear(); + } else if (frameDecoder.state() == STATE_FINISHED) { + processFrame(frameDecoder.getResult()); + frameDecoder.clear(); + } + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/Http2FlowHandlerDataEncoder.java Tue Jun 02 11:35:56 2020 +0200 @@ -0,0 +1,22 @@ +package com.passus.st.client.http; + +import com.passus.data.ByteBuff; +import com.passus.net.http.HttpRequest; +import com.passus.st.client.FlowContext; +import com.passus.st.client.FlowHandlerDataEncoder; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; + +public class Http2FlowHandlerDataEncoder implements FlowHandlerDataEncoder<HttpRequest> { + + + + private int nextStreamId = 1; + + private int windowSize = 1; + + @Override + public void encode(HttpRequest request, FlowContext flowContext, ByteBuff out) { + + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/client/http/Http2Stream.java Tue Jun 02 11:35:56 2020 +0200 @@ -0,0 +1,24 @@ +package com.passus.st.client.http; + +import com.passus.net.http2.Http2Frame; + +import java.util.ArrayList; +import java.util.List; + +import static com.passus.net.http2.Http2Utils.SETTINGS_DEFAULT_WINDOW_SIZE; + +public class Http2Stream { + + public static final int STATE_IDLE = 0; + public static final int STATE_RESERVED = 1; + public static final int STATE_OPEN = 2; + public static final int STATE_HALF_CLOSED = 3; + public static final int STATE_CLOSED = 4; + + int state = STATE_IDLE; + + int remoteWindowSize = SETTINGS_DEFAULT_WINDOW_SIZE; + + List<Http2Frame> frames = new ArrayList<>(); + +}