Mercurial > stress-tester
changeset 667:78750e3f3ca5
NcEventDestination
author | Devel 2 |
---|---|
date | Fri, 17 Nov 2017 14:27:25 +0100 |
parents | 003105670d98 |
children | c609aa9a8a2c |
files | stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java stress-tester/src/main/java/com/passus/st/source/EventDestination.java stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java |
diffstat | 4 files changed, 187 insertions(+), 18 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Fri Nov 17 12:49:13 2017 +0100 +++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java Fri Nov 17 14:27:25 2017 +0100 @@ -1,6 +1,8 @@ package com.passus.st.reader.nc; import com.passus.commons.Assert; +import com.passus.data.ByteBuff; +import com.passus.data.DataSource; import com.passus.st.emitter.SessionInfo; import com.passus.st.reader.DataBlockWriter; import com.passus.st.reader.nc.block.NcDataBlock; @@ -66,6 +68,16 @@ this.maxSegmentBlocks = maxSegmentBlocks; } + private void write(ByteBuffer[] buffers) throws IOException { + for (ByteBuffer buff : buffers) { + buff.flip(); + } + ch.write(buffers); + for (ByteBuffer buff : buffers) { + buff.clear(); + } + } + private void writeBuffer() throws IOException { buffer.flip(); ch.write(buffer); @@ -107,16 +119,6 @@ } } - private int writeHeader() throws IOException { - buffer.clear(); - buffer.put(NcHeader.PREAMBULE); - buffer.put(NcDataUtils.VERSION_MAJOR); - buffer.put(NcDataUtils.VERSION_MINOR); - buffer.putLong(0L); - writeBuffer(); - return NcHeader.SIZE; - } - private int getSessionId(SessionInfo session, boolean writeIfNeeded) { Integer sessionId = sessionIdMap.get(session); if (writeIfNeeded && sessionId == null) { @@ -132,6 +134,16 @@ sessionIdMap.clear(); } + private int writeHeader() throws IOException { + buffer.clear(); + buffer.put(NcHeader.PREAMBULE); + buffer.put(NcDataUtils.VERSION_MAJOR); + buffer.put(NcDataUtils.VERSION_MINOR); + buffer.putLong(0L); + writeBuffer(); + return NcHeader.SIZE; + } + private void writeSegmentBlock(long position) throws IOException { currentSegmentsBlocks = 0; currentSegmentSize = 0; @@ -155,23 +167,23 @@ return sessionId; } - public void writeSessionStatusBlock(long timestamp, SessionInfo session, byte status) { + public void writeSessionStatus(long timestamp, SessionInfo session, byte status) throws IOException { checkOpened(); int sessionId = getSessionId(session, true); - buffer.putLong(timestamp); buffer.putInt(sessionId); buffer.put(status); + + writeBuffer(); } - public void writeSessionPayload(long timestamp, SessionInfo session, byte proto, byte[] data) throws IOException { + public void writeSessionPayload(long timestamp, SessionInfo session, byte proto, Object data) throws IOException { checkOpened(); - writeSessionPayload(timestamp, session, proto, data, (byte) 0); } - public void writeSessionPayload(long timestamp, SessionInfo session, byte proto, byte[] data, byte flags) throws IOException { + public void writeSessionPayload(long timestamp, SessionInfo session, byte proto, Object data, byte flags) throws IOException { checkOpened(); int sessionId = sessionIdMap.get(session); @@ -181,9 +193,34 @@ buffer.putInt(sessionId); buffer.put(flags); buffer.put(proto); - buffer.putInt(data.length); //@TODO Length - zmien na format docelowy - ch.write(new ByteBuffer[]{buffer, ByteBuffer.wrap(data)}); + ByteBuffer dataBuffer; + if (data == null) { + buffer.putInt(0); + dataBuffer = null; + } else if (data instanceof byte[]) { + byte[] arrData = (byte[]) data; + buffer.putInt(arrData.length); //@TODO Length - zmien na format docelowy + dataBuffer = ByteBuffer.wrap(arrData); + } else if (data instanceof DataSource) { + DataSource dsData = (DataSource) data; + int length = dsData.available(); + buffer.putInt(length); + dataBuffer = ByteBuffer.allocate(length); + dsData.write(dataBuffer.array(), 0, length); + } else if (data instanceof ByteBuff) { + ByteBuff bbData = (ByteBuff) data; + dataBuffer = bbData.toNioByteBuffer(false); + buffer.putInt(bbData.readableBytes()); + } else { + throw new IllegalArgumentException("Not supported data class '" + data.getClass() + "'."); + } + + if (dataBuffer == null) { + write(new ByteBuffer[]{buffer, dataBuffer}); + } else { + writeBuffer(); + } } @Override
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/source/EventDestination.java Fri Nov 17 14:27:25 2017 +0100 @@ -0,0 +1,13 @@ +package com.passus.st.source; + +import com.passus.commons.service.Service; +import com.passus.config.Configurable; +import com.passus.st.client.EventHandler; + +/** + * + * @author Mirosław Hawrot + */ +public interface EventDestination extends Service, Configurable, EventHandler { + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java Fri Nov 17 14:27:25 2017 +0100 @@ -0,0 +1,120 @@ +package com.passus.st.source; + +import com.passus.commons.service.ServiceException; +import com.passus.config.Configuration; +import com.passus.data.ByteBuff; +import com.passus.data.HeapByteBuff; +import com.passus.net.http.HttpRequestEncoder; +import com.passus.net.http.HttpResponseEncoder; +import com.passus.st.client.DataEvents.DataEnd; +import com.passus.st.client.Event; +import com.passus.st.client.SessionStatusEvent; +import com.passus.st.client.http.HttpSessionPayloadEvent; +import com.passus.st.reader.nc.NcDataBlockWriter; + +/** + * + * @author Mirosław Hawrot + */ +public class NcEventDestination implements EventDestination { + + private String ncFile; + + private volatile boolean started; + + private NcDataBlockWriter writer; + + private final HttpRequestEncoder requestEncoder = HttpRequestEncoder.getInstance(); + + private final HttpResponseEncoder responseEncoder = HttpResponseEncoder.getInstance(); + + @Override + public boolean isStarted() { + return started; + } + + @Override + public void start() { + if (started) { + return; + } + + try { + writer = new NcDataBlockWriter(ncFile); + writer.open(); + + started = true; + } catch (Exception e) { + stop0(); + throw new ServiceException("Unable to start NcEventDestination. " + e.getMessage(), e); + } + + } + + @Override + public void stop() { + if (!started) { + return; + } + + stop0(); + } + + private void stop0() { + if (writer != null) { + try { + writer.close(); + } catch (Exception ignore) { + } + } + + started = false; + } + + private void checkStarted() { + if (!started) { + throw new IllegalStateException("Destination is not started."); + } + } + + @Override + public void configure(Configuration config) { + + } + + @Override + public void handle(Event event) { + checkStarted(); + + try { + switch (event.getType()) { + case SessionStatusEvent.TYPE: + SessionStatusEvent statusEvent = (SessionStatusEvent) event; + writer.writeSessionStatus(statusEvent.getTimestamp(), statusEvent.getSessionInfo(), (byte) statusEvent.getStatus()); + break; + case HttpSessionPayloadEvent.TYPE: + HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event; + + if (payloadEvent.getRequest() != null) { + ByteBuff buff = new HeapByteBuff(); + requestEncoder.encode(payloadEvent.getRequest(), buff); + writer.writeSessionPayload(payloadEvent.getTimestamp(), payloadEvent.getSessionInfo(), (byte) 1, buff); + } + + if (payloadEvent.getResponse() != null) { + ByteBuff buff = new HeapByteBuff(); + responseEncoder.encode(payloadEvent.getResponse(), buff); + writer.writeSessionPayload(payloadEvent.getTimestamp(), payloadEvent.getSessionInfo(), (byte) 1, buff); + } + break; + case DataEnd.TYPE: + stop(); + break; + } + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + + } + +}