changeset 667:78750e3f3ca5

NcEventDestination
author Devel 2
date Fri, 17 Nov 2017 14:27:25 +0100
parents 003105670d98
children c609aa9a8a2c
files stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java stress-tester/src/main/java/com/passus/st/source/EventDestination.java stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java
diffstat 4 files changed, 187 insertions(+), 18 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Fri Nov 17 12:49:13 2017 +0100
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDataBlockWriter.java	Fri Nov 17 14:27:25 2017 +0100
@@ -1,6 +1,8 @@
 package com.passus.st.reader.nc;
 
 import com.passus.commons.Assert;
+import com.passus.data.ByteBuff;
+import com.passus.data.DataSource;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.reader.DataBlockWriter;
 import com.passus.st.reader.nc.block.NcDataBlock;
@@ -66,6 +68,16 @@
         this.maxSegmentBlocks = maxSegmentBlocks;
     }
 
+    private void write(ByteBuffer[] buffers) throws IOException {
+        for (ByteBuffer buff : buffers) {
+            buff.flip();
+        }
+        ch.write(buffers);
+        for (ByteBuffer buff : buffers) {
+            buff.clear();
+        }
+    }
+
     private void writeBuffer() throws IOException {
         buffer.flip();
         ch.write(buffer);
@@ -107,16 +119,6 @@
         }
     }
 
-    private int writeHeader() throws IOException {
-        buffer.clear();
-        buffer.put(NcHeader.PREAMBULE);
-        buffer.put(NcDataUtils.VERSION_MAJOR);
-        buffer.put(NcDataUtils.VERSION_MINOR);
-        buffer.putLong(0L);
-        writeBuffer();
-        return NcHeader.SIZE;
-    }
-
     private int getSessionId(SessionInfo session, boolean writeIfNeeded) {
         Integer sessionId = sessionIdMap.get(session);
         if (writeIfNeeded && sessionId == null) {
@@ -132,6 +134,16 @@
         sessionIdMap.clear();
     }
 
+    private int writeHeader() throws IOException {
+        buffer.clear();
+        buffer.put(NcHeader.PREAMBULE);
+        buffer.put(NcDataUtils.VERSION_MAJOR);
+        buffer.put(NcDataUtils.VERSION_MINOR);
+        buffer.putLong(0L);
+        writeBuffer();
+        return NcHeader.SIZE;
+    }
+
     private void writeSegmentBlock(long position) throws IOException {
         currentSegmentsBlocks = 0;
         currentSegmentSize = 0;
@@ -155,23 +167,23 @@
         return sessionId;
     }
 
-    public void writeSessionStatusBlock(long timestamp, SessionInfo session, byte status) {
+    public void writeSessionStatus(long timestamp, SessionInfo session, byte status) throws IOException {
         checkOpened();
 
         int sessionId = getSessionId(session, true);
-
         buffer.putLong(timestamp);
         buffer.putInt(sessionId);
         buffer.put(status);
+        
+        writeBuffer();
     }
 
-    public void writeSessionPayload(long timestamp, SessionInfo session, byte proto, byte[] data) throws IOException {
+    public void writeSessionPayload(long timestamp, SessionInfo session, byte proto, Object data) throws IOException {
         checkOpened();
-
         writeSessionPayload(timestamp, session, proto, data, (byte) 0);
     }
 
-    public void writeSessionPayload(long timestamp, SessionInfo session, byte proto, byte[] data, byte flags) throws IOException {
+    public void writeSessionPayload(long timestamp, SessionInfo session, byte proto, Object data, byte flags) throws IOException {
         checkOpened();
 
         int sessionId = sessionIdMap.get(session);
@@ -181,9 +193,34 @@
         buffer.putInt(sessionId);
         buffer.put(flags);
         buffer.put(proto);
-        buffer.putInt(data.length); //@TODO Length - zmien na format docelowy
 
-        ch.write(new ByteBuffer[]{buffer, ByteBuffer.wrap(data)});
+        ByteBuffer dataBuffer;
+        if (data == null) {
+            buffer.putInt(0);
+            dataBuffer = null;
+        } else if (data instanceof byte[]) {
+            byte[] arrData = (byte[]) data;
+            buffer.putInt(arrData.length); //@TODO Length - zmien na format docelowy
+            dataBuffer = ByteBuffer.wrap(arrData);
+        } else if (data instanceof DataSource) {
+            DataSource dsData = (DataSource) data;
+            int length = dsData.available();
+            buffer.putInt(length);
+            dataBuffer = ByteBuffer.allocate(length);
+            dsData.write(dataBuffer.array(), 0, length);
+        } else if (data instanceof ByteBuff) {
+            ByteBuff bbData = (ByteBuff) data;
+            dataBuffer = bbData.toNioByteBuffer(false);
+            buffer.putInt(bbData.readableBytes());
+        } else {
+            throw new IllegalArgumentException("Not supported data class '" + data.getClass() + "'.");
+        }
+
+        if (dataBuffer == null) {
+            write(new ByteBuffer[]{buffer, dataBuffer});
+        } else {
+            writeBuffer();
+        }
     }
 
     @Override
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/source/EventDestination.java	Fri Nov 17 14:27:25 2017 +0100
@@ -0,0 +1,13 @@
+package com.passus.st.source;
+
+import com.passus.commons.service.Service;
+import com.passus.config.Configurable;
+import com.passus.st.client.EventHandler;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public interface EventDestination extends Service, Configurable, EventHandler {
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/source/NcEventDestination.java	Fri Nov 17 14:27:25 2017 +0100
@@ -0,0 +1,120 @@
+package com.passus.st.source;
+
+import com.passus.commons.service.ServiceException;
+import com.passus.config.Configuration;
+import com.passus.data.ByteBuff;
+import com.passus.data.HeapByteBuff;
+import com.passus.net.http.HttpRequestEncoder;
+import com.passus.net.http.HttpResponseEncoder;
+import com.passus.st.client.DataEvents.DataEnd;
+import com.passus.st.client.Event;
+import com.passus.st.client.SessionStatusEvent;
+import com.passus.st.client.http.HttpSessionPayloadEvent;
+import com.passus.st.reader.nc.NcDataBlockWriter;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class NcEventDestination implements EventDestination {
+
+    private String ncFile;
+
+    private volatile boolean started;
+
+    private NcDataBlockWriter writer;
+
+    private final HttpRequestEncoder requestEncoder = HttpRequestEncoder.getInstance();
+
+    private final HttpResponseEncoder responseEncoder = HttpResponseEncoder.getInstance();
+
+    @Override
+    public boolean isStarted() {
+        return started;
+    }
+
+    @Override
+    public void start() {
+        if (started) {
+            return;
+        }
+
+        try {
+            writer = new NcDataBlockWriter(ncFile);
+            writer.open();
+
+            started = true;
+        } catch (Exception e) {
+            stop0();
+            throw new ServiceException("Unable to start NcEventDestination. " + e.getMessage(), e);
+        }
+
+    }
+
+    @Override
+    public void stop() {
+        if (!started) {
+            return;
+        }
+
+        stop0();
+    }
+
+    private void stop0() {
+        if (writer != null) {
+            try {
+                writer.close();
+            } catch (Exception ignore) {
+            }
+        }
+
+        started = false;
+    }
+
+    private void checkStarted() {
+        if (!started) {
+            throw new IllegalStateException("Destination is not started.");
+        }
+    }
+
+    @Override
+    public void configure(Configuration config) {
+
+    }
+
+    @Override
+    public void handle(Event event) {
+        checkStarted();
+
+        try {
+            switch (event.getType()) {
+                case SessionStatusEvent.TYPE:
+                    SessionStatusEvent statusEvent = (SessionStatusEvent) event;
+                    writer.writeSessionStatus(statusEvent.getTimestamp(), statusEvent.getSessionInfo(), (byte) statusEvent.getStatus());
+                    break;
+                case HttpSessionPayloadEvent.TYPE:
+                    HttpSessionPayloadEvent payloadEvent = (HttpSessionPayloadEvent) event;
+
+                    if (payloadEvent.getRequest() != null) {
+                        ByteBuff buff = new HeapByteBuff();
+                        requestEncoder.encode(payloadEvent.getRequest(), buff);
+                        writer.writeSessionPayload(payloadEvent.getTimestamp(), payloadEvent.getSessionInfo(), (byte) 1, buff);
+                    }
+
+                    if (payloadEvent.getResponse() != null) {
+                        ByteBuff buff = new HeapByteBuff();
+                        responseEncoder.encode(payloadEvent.getResponse(), buff);
+                        writer.writeSessionPayload(payloadEvent.getTimestamp(), payloadEvent.getSessionInfo(), (byte) 1, buff);
+                    }
+                    break;
+                case DataEnd.TYPE:
+                    stop();
+                    break;
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+
+    }
+
+}
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java	Fri Nov 17 12:49:13 2017 +0100
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcDataBlockWriterTest.java	Fri Nov 17 14:27:25 2017 +0100
@@ -36,7 +36,6 @@
         } finally {
             tmpFile.delete();
         }
-
     }
 
 }