changeset 1038:9dff4810b5b8

NcEventSource - DNS and Netflow support
author Devel 2
date Fri, 10 Apr 2020 10:22:01 +0200
parents e62df9b84df7
children 58860e741b9e
files stress-tester/src/main/java/com/passus/st/Protocols.java stress-tester/src/main/java/com/passus/st/reader/nc/NcDnsPayloadReader.java stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpPayloadReader.java stress-tester/src/main/java/com/passus/st/reader/nc/NcNetflowPayloadReader.java stress-tester/src/main/java/com/passus/st/source/NcEventSource.java stress-tester/src/main/java/com/passus/st/source/NcPayloadReader.java stress-tester/src/test/java/com/passus/st/reader/nc/NcHttpPayloadReaderTest.java stress-tester/src/test/java/com/passus/st/reader/nc/NcHttpPayloadWriterTest.java stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java stress-tester/src/test/java/com/passus/st/utils/Assert.java
diffstat 10 files changed, 209 insertions(+), 119 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/Protocols.java	Thu Apr 09 13:49:19 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/Protocols.java	Fri Apr 10 10:22:01 2020 +0200
@@ -1,5 +1,9 @@
 package com.passus.st;
 
+import com.passus.commons.utils.ArrayUtils;
+
+import java.util.Set;
+
 public class Protocols {
 
     public static final int UNKNOWN = 0;
@@ -12,6 +16,10 @@
     private Protocols() {
     }
 
+    public static Set<Integer> getAllProtocols() {
+        return ArrayUtils.asSet(HTTP, DNS, NETFLOW, PGSQL, MYSQL);
+    }
+
     public static String protocolToString(int protocolId) {
         switch (protocolId) {
             case HTTP:
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcDnsPayloadReader.java	Fri Apr 10 10:22:01 2020 +0200
@@ -0,0 +1,37 @@
+package com.passus.st.reader.nc;
+
+import com.passus.data.ByteBuff;
+import com.passus.data.DataDecoder;
+import com.passus.net.dns.Dns;
+import com.passus.net.dns.DnsDecoder;
+import com.passus.st.source.NcPayloadReader;
+
+import java.io.IOException;
+
+public class NcDnsPayloadReader extends NcPayloadReader<Dns, Dns> {
+
+    private final DnsDecoder decoder = new DnsDecoder();
+
+    private Dns decode(ByteBuff payload) throws IOException {
+        try {
+            int res = decoder.decode(payload);
+            payload.skipBytes(res);
+            if (decoder.state() == DataDecoder.STATE_FINISHED) {
+                return decoder.getResult();
+            }
+            throw new IOException("Invalid Dns data.");
+        } finally {
+            decoder.clear();
+        }
+    }
+
+    @Override
+    protected Dns decodeRequest(ByteBuff payload) throws IOException {
+        return decode(payload);
+    }
+
+    @Override
+    protected Dns decodeResponse(ByteBuff payload) throws IOException {
+        return decode(payload);
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpPayloadReader.java	Thu Apr 09 13:49:19 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcHttpPayloadReader.java	Fri Apr 10 10:22:01 2020 +0200
@@ -3,20 +3,16 @@
 import com.passus.commons.Assert;
 import com.passus.data.*;
 import com.passus.net.http.*;
-import com.passus.st.client.SessionPayloadEvent;
-import com.passus.st.client.http.HttpReqResp;
 import com.passus.st.source.NcPayloadReader;
 
 import java.io.IOException;
 
-import static com.passus.st.reader.nc.NcDataUtils.*;
+import static com.passus.st.reader.nc.NcDataUtils.CUSTOM_HEADER_CODE;
 
 /**
  * @author Mirosław Hawrot
  */
-public class NcHttpPayloadReader implements NcPayloadReader {
-
-    private final NcDataHelper ncDataHelper = NcDataHelper.getInstance();
+public class NcHttpPayloadReader extends NcPayloadReader<HttpRequest, HttpResponse> {
 
     private ByteBuffAllocator allocator = new DefaultByteBuffAllocator();
 
@@ -29,20 +25,6 @@
         this.allocator = allocator;
     }
 
-    private void skipRequest(ByteBuff buffer) throws IOException {
-        ncDataHelper.skipByteStringNullTerminated(buffer);
-        ncDataHelper.skipByteStringNullTerminated(buffer);
-        buffer.read();
-        skipHeaders(buffer);
-    }
-
-    private void skipResponse(ByteBuff buffer) throws IOException {
-        buffer.skipBytes(2);
-        ncDataHelper.skipByteStringNullTerminated(buffer);
-        buffer.read();
-        skipHeaders(buffer);
-    }
-
     public int decodeVersion(ByteBuff buffer) throws IOException {
         byte b = buffer.read();
         if (b == NcDataUtils.VERSION_1_0) {
@@ -93,7 +75,7 @@
         }
     }
 
-    public HttpHeaders decodeHeaders(ByteBuff buffer) throws IOException {
+    private HttpHeaders decodeHeaders(ByteBuff buffer) throws IOException {
         long headerSize = ncDataHelper.readLongVLC(buffer);
         HttpHeaders headers;
         if (HttpHeadersDecoder.isDefaultEnableCachedHeaders()) {
@@ -145,7 +127,7 @@
         }
     }
 
-    public HttpRequest decodeRequest(ByteBuff buffer) throws IOException {
+    private HttpRequest decodeRequestHeaders(ByteBuff buffer) throws IOException {
         HttpMethod method = decodeMethod(buffer);
         ByteString uri = ncDataHelper.readByteStringNullTerminated(buffer);
         HttpRequest req = new HttpRequest(uri, method);
@@ -154,13 +136,13 @@
         return req;
     }
 
-    public HttpRequest decodeFullRequest(ByteBuff buffer) throws IOException {
-        HttpRequest req = decodeRequest(buffer);
+    protected HttpRequest decodeRequest(ByteBuff buffer) throws IOException {
+        HttpRequest req = decodeRequestHeaders(buffer);
         req.setContent(decodeContent(buffer));
         return req;
     }
 
-    public HttpResponse decodeResponse(ByteBuff buffer) throws IOException {
+    private HttpResponse decodeResponseHeaders(ByteBuff buffer) throws IOException {
         int statusCode = ncDataHelper.readInt2(buffer);
         ByteString reasonPhrase = ncDataHelper.readByteStringNullTerminated(buffer);
         HttpStatus status = new HttpStatus(statusCode, reasonPhrase);
@@ -170,56 +152,10 @@
         return resp;
     }
 
-    public HttpResponse decodeFullResponse(ByteBuff buffer) throws IOException {
-        HttpResponse resp = decodeResponse(buffer);
+    protected HttpResponse decodeResponse(ByteBuff buffer) throws IOException {
+        HttpResponse resp = decodeResponseHeaders(buffer);
         resp.setContent(decodeContent(buffer));
         return resp;
     }
 
-    public HttpMessage decodeMessage(ByteBuff buffer) throws IOException {
-        byte flags = buffer.read();
-
-        HttpMessage msg;
-        if ((flags & FLAG_REQUEST) != 0) {
-            msg = decodeRequest(buffer);
-        } else {
-            msg = decodeResponse(buffer);
-        }
-
-        return msg;
-    }
-
-    public HttpReqResp decodeMessages(ByteBuff buffer) throws IOException {
-        return decodeMessages(buffer, false, false);
-    }
-
-    public HttpReqResp decodeMessages(ByteBuff buffer, boolean skipRequest, boolean skipResponse) throws IOException {
-        byte flags = buffer.read();
-        HttpRequest req = null;
-        if ((flags & FLAG_REQUEST) != 0) {
-            if (skipRequest) {
-                skipRequest(buffer);
-            } else {
-                req = decodeFullRequest(buffer);
-            }
-        }
-
-        HttpResponse resp = null;
-        if ((flags & FLAG_RESPONSE) != 0) {
-            if (skipResponse) {
-                skipResponse(buffer);
-            } else {
-                resp = decodeFullResponse(buffer);
-            }
-        }
-
-        buffer.release();
-        return new HttpReqResp(req, resp);
-    }
-
-    @Override
-    public SessionPayloadEvent read(NcDataBlockReader reader) throws IOException {
-        return null;
-    }
-
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/reader/nc/NcNetflowPayloadReader.java	Fri Apr 10 10:22:01 2020 +0200
@@ -0,0 +1,57 @@
+package com.passus.st.reader.nc;
+
+import com.passus.data.ByteBuff;
+import com.passus.data.DataUtils;
+import com.passus.net.dns.DnsDecoder;
+import com.passus.net.netflow.Netflow;
+import com.passus.net.netflow.Netflow5Decoder;
+import com.passus.net.netflow.Netflow9Decoder;
+import com.passus.net.netflow.NetflowUtils;
+import com.passus.st.source.NcPayloadReader;
+
+import java.io.IOException;
+
+public class NcNetflowPayloadReader extends NcPayloadReader<Netflow, Netflow> {
+
+    private final Netflow5Decoder decoder5 = new Netflow5Decoder();
+
+    private final Netflow9Decoder decoder9 = new Netflow9Decoder();
+
+    @Override
+    protected Netflow decodeRequest(ByteBuff payload) throws IOException {
+        int version = DataUtils.getInt2(payload);
+        payload.skipBytes(2);
+        if (version == NetflowUtils.VERSION_5) {
+            try {
+                int res = decoder5.decode0(payload.buffer(), payload.startIndex(), payload.readableBytes());
+                payload.skipBytes(res);
+                if (decoder5.state() == DnsDecoder.STATE_FINISHED) {
+                    return decoder5.getResult();
+                } else {
+                    throw new IOException("Invalid Netflow v5 data.");
+                }
+            } finally {
+                decoder5.clear();
+            }
+        } else if (version == NetflowUtils.VERSION_9) {
+            try {
+                int res = decoder9.decode0(payload.buffer(), payload.startIndex(), payload.readableBytes());
+                payload.skipBytes(res);
+                if (decoder9.state() == DnsDecoder.STATE_FINISHED) {
+                    return decoder9.getResult();
+                } else {
+                    throw new IOException("Invalid Netflow v9 data.");
+                }
+            } finally {
+                decoder9.clear();
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    protected Netflow decodeResponse(ByteBuff payload) throws IOException {
+        return null;
+    }
+}
--- a/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Thu Apr 09 13:49:19 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/NcEventSource.java	Fri Apr 10 10:22:01 2020 +0200
@@ -23,10 +23,7 @@
 import com.passus.st.client.SessionStatusEvent;
 import com.passus.st.emitter.SessionInfo;
 import com.passus.st.plugin.PluginConstants;
-import com.passus.st.reader.nc.NcDataBlockReader;
-import com.passus.st.reader.nc.NcHttpPayloadReader;
-import com.passus.st.reader.nc.NcSessionPayloadBlock;
-import com.passus.st.reader.nc.NcSessionStatusBlock;
+import com.passus.st.reader.nc.*;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -34,7 +31,7 @@
 import java.io.IOException;
 
 import static com.passus.config.schema.ConfigurationSchemaBuilder.*;
-import static com.passus.st.Protocols.HTTP;
+import static com.passus.st.Protocols.*;
 
 /**
  * @author Mirosław Hawrot
@@ -57,6 +54,10 @@
 
     private NcHttpPayloadReader httpReader = new NcHttpPayloadReader();
 
+    private NcNetflowPayloadReader netflowReader = new NcNetflowPayloadReader();
+
+    private NcDnsPayloadReader dnsReader = new NcDnsPayloadReader();
+
     private String name = UniqueIdGenerator.generate();
 
     private ReaderThread readerThread;
@@ -241,13 +242,26 @@
                 sessionInfo.setSourceName(getName());
 
                 ByteBuff payload = readPayload(payloadBlock.data());
-                ReqRespPair messages = null;
-                if (payloadBlock.proto() == HTTP) {
-                    messages = httpReader.decodeMessages(payload);
+                ReqRespPair messages;
+                switch (payloadBlock.proto()) {
+                    case HTTP:
+                        messages = httpReader.read(payload);
+                        break;
+                    case DNS:
+                        messages = dnsReader.read(payload);
+                        break;
+                    case NETFLOW:
+                        messages = netflowReader.read(payload);
+                        break;
+                    default:
+                        if (LOGGER.isDebugEnabled()) {
+                            LOGGER.debug("Not supported protocol {}.", payloadBlock.proto());
+                        }
+
+                        return;
                 }
 
-
-                handler.handle(new SessionPayloadEvent(sessionInfo, messages.getRequest(), messages.getResponse(), HTTP, getName()));
+                handler.handle(new SessionPayloadEvent(sessionInfo, messages.getRequest(), messages.getResponse(), payloadBlock.proto(), getName()));
                 break;
             default:
                 reader.read();
--- a/stress-tester/src/main/java/com/passus/st/source/NcPayloadReader.java	Thu Apr 09 13:49:19 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/NcPayloadReader.java	Fri Apr 10 10:22:01 2020 +0200
@@ -1,12 +1,36 @@
 package com.passus.st.source;
 
-import com.passus.st.client.SessionPayloadEvent;
-import com.passus.st.reader.nc.NcDataBlockReader;
+import com.passus.data.ByteBuff;
+import com.passus.st.ReqRespPair;
+import com.passus.st.reader.nc.NcDataHelper;
 
 import java.io.IOException;
 
-public interface NcPayloadReader {
+import static com.passus.st.reader.nc.NcDataUtils.FLAG_REQUEST;
+import static com.passus.st.reader.nc.NcDataUtils.FLAG_RESPONSE;
 
-    SessionPayloadEvent read(NcDataBlockReader reader) throws IOException;
-    
+public abstract class NcPayloadReader<R, S> {
+
+    protected final NcDataHelper ncDataHelper = NcDataHelper.getInstance();
+
+    protected abstract R decodeRequest(ByteBuff payload) throws IOException;
+
+    protected abstract S decodeResponse(ByteBuff payload) throws IOException;
+
+    public ReqRespPair<R, S> read(ByteBuff payload) throws IOException {
+        byte flags = payload.read();
+        R req = null;
+        S resp = null;
+        if ((flags & FLAG_REQUEST) != 0) {
+            req = decodeRequest(payload);
+        }
+
+        if ((flags & FLAG_RESPONSE) != 0) {
+            resp = decodeResponse(payload);
+        }
+
+        payload.release();
+        return new ReqRespPair(req, resp);
+    }
+
 }
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcHttpPayloadReaderTest.java	Thu Apr 09 13:49:19 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcHttpPayloadReaderTest.java	Fri Apr 10 10:22:01 2020 +0200
@@ -2,18 +2,18 @@
 
 import com.passus.data.ByteBuff;
 import com.passus.data.HeapByteBuff;
-import com.passus.net.http.HttpMessage;
 import com.passus.net.http.HttpRequest;
 import com.passus.net.http.HttpRequestBuilder;
 import com.passus.net.http.HttpResponse;
 import com.passus.net.http.HttpResponseBuilder;
-import org.testng.annotations.Test;
-import static com.passus.st.utils.HttpMessageAssert.*;
+import com.passus.st.ReqRespPair;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import static com.passus.st.utils.HttpMessageAssert.assertMessages;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public class NcHttpPayloadReaderTest {
@@ -51,8 +51,8 @@
         writer.encodeMessage(req, buffer);
 
         NcHttpPayloadReader reader = new NcHttpPayloadReader();
-        HttpMessage msgDecoded = reader.decodeMessage(buffer);
-        assertMessages(req, msgDecoded);
+        ReqRespPair<HttpRequest, HttpResponse> msgDecoded = reader.read(buffer);
+        assertMessages(req, msgDecoded.getRequest());
     }
 
     @Test
@@ -60,8 +60,8 @@
         writer.encodeMessage(resp, buffer);
 
         NcHttpPayloadReader reader = new NcHttpPayloadReader();
-        HttpMessage msgDecoded = reader.decodeMessage(buffer);
-        assertMessages(resp, msgDecoded);
+        ReqRespPair<HttpRequest, HttpResponse> msgDecoded = reader.read(buffer);
+        assertMessages(resp, msgDecoded.getResponse());
     }
 
     @Test
--- a/stress-tester/src/test/java/com/passus/st/reader/nc/NcHttpPayloadWriterTest.java	Thu Apr 09 13:49:19 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/reader/nc/NcHttpPayloadWriterTest.java	Fri Apr 10 10:22:01 2020 +0200
@@ -2,6 +2,7 @@
 
 import com.passus.data.HeapByteBuff;
 import com.passus.net.http.*;
+import com.passus.st.ReqRespPair;
 import com.passus.st.client.SessionPayloadEvent;
 import com.passus.st.client.http.HttpReqResp;
 import com.passus.st.emitter.SessionInfo;
@@ -85,7 +86,7 @@
 
             NcSessionPayloadBlock payload = readPayloadBlock(file);
             NcHttpPayloadReader payloadReader = new NcHttpPayloadReader();
-            HttpReqResp reqResp = payloadReader.decodeMessages(toByteBuff(payload.data()));
+            ReqRespPair<HttpRequest, HttpResponse> reqResp = payloadReader.read(toByteBuff(payload.data()));
             assertFalse(reqResp == null);
             assertTrue(reqResp.getResponse() == null);
             assertMessages(req, reqResp.getRequest());
@@ -107,7 +108,7 @@
 
             NcSessionPayloadBlock payload = readPayloadBlock(file);
             NcHttpPayloadReader payloadReader = new NcHttpPayloadReader();
-            HttpReqResp reqResp = payloadReader.decodeMessages(toByteBuff(payload.data()));
+            ReqRespPair<HttpRequest, HttpResponse> reqResp = payloadReader.read(toByteBuff(payload.data()));
             assertFalse(reqResp == null);
             assertTrue(reqResp.getResponse() == null);
             assertMessages(req, reqResp.getRequest());
@@ -129,7 +130,7 @@
 
             NcSessionPayloadBlock payload = readPayloadBlock(file);
             NcHttpPayloadReader payloadReader = new NcHttpPayloadReader();
-            HttpReqResp reqResp = payloadReader.decodeMessages(toByteBuff(payload.data()));
+            ReqRespPair<HttpRequest, HttpResponse> reqResp = payloadReader.read(toByteBuff(payload.data()));
             assertFalse(reqResp == null);
             assertTrue(reqResp.getResponse() == null);
             assertMessages(req, reqResp.getRequest());
@@ -151,7 +152,7 @@
 
             NcSessionPayloadBlock payload = readPayloadBlock(file);
             NcHttpPayloadReader payloadReader = new NcHttpPayloadReader();
-            HttpReqResp reqResp = payloadReader.decodeMessages(toByteBuff(payload.data()));
+            ReqRespPair<HttpRequest, HttpResponse> reqResp = payloadReader.read(toByteBuff(payload.data()));
             assertFalse(reqResp == null);
             assertTrue(reqResp.getRequest() == null);
             assertMessages(resp, reqResp.getResponse());
@@ -169,7 +170,7 @@
 
             NcSessionPayloadBlock payload = readPayloadBlock(file);
             NcHttpPayloadReader payloadReader = new NcHttpPayloadReader();
-            HttpReqResp reqResp = payloadReader.decodeMessages(toByteBuff(payload.data()));
+            ReqRespPair<HttpRequest, HttpResponse> reqResp = payloadReader.read(toByteBuff(payload.data()));
             assertFalse(reqResp == null);
             assertTrue(reqResp.getRequest() == null);
             assertTrue(reqResp.getResponse() == null);
--- a/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Thu Apr 09 13:49:19 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/source/NcEventSourceTest.java	Fri Apr 10 10:22:01 2020 +0200
@@ -1,30 +1,36 @@
 package com.passus.st.source;
 
-import static com.passus.commons.utils.ResourceUtils.createTmpFile;
 import com.passus.data.PooledByteBuffAllocator;
+import com.passus.net.PortRangeSet;
+import com.passus.st.Protocols;
 import com.passus.st.client.ArrayListEventHandler;
 import com.passus.st.client.Event;
 import com.passus.st.utils.EventUtils;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
-import static com.passus.st.utils.Assert.*;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
+
+import static com.passus.commons.utils.ResourceUtils.createTmpFile;
+import static com.passus.st.utils.Assert.assertEquals;
+import static com.passus.st.utils.Assert.assertEvents;
 
 /**
- *
  * @author Mirosław Hawrot
  */
 public class NcEventSourceTest {
 
     private FileEvents writeEvents(String pcapFile) throws IOException {
+        PortRangeSet portRanges = new PortRangeSet();
+        portRanges.add(4214).add(53).add(2055);
         Map<String, Object> props = new HashMap<>();
         props.put("allowPartialSession", true);
-        props.put("ports", 4214);
+        props.put("ports", portRanges);
+        props.put("protocols", Protocols.getAllProtocols());
         List<Event> events = EventUtils.readEvents(pcapFile, props);
 
         File tmpFile = createTmpFile(false);
@@ -39,16 +45,18 @@
     @DataProvider(name = "pcapFiles")
     public Object[][] pcapFiles() {
         return new Object[][]{
-            {"pcap/http/http_req_resp.pcap"},
-            {"pcap/http/http_ndiag_tcp_conn.pcap"},
-            {"pcap/http/basic_digest.pcap"},
-            {"pcap/http/http_get_png.pcap"},
-            {"pcap/http/http_1.pcap"}
+                {"pcap/http/http_req_resp.pcap"},
+                {"pcap/http/http_ndiag_tcp_conn.pcap"},
+                {"pcap/http/basic_digest.pcap"},
+                {"pcap/http/http_get_png.pcap"},
+                {"pcap/http/http_1.pcap"},
+                {"pcap/dns/dns_A_req_resp.pcap"},
+                {"pcap/netflow/netflow_v5.pcap"}
         };
     }
 
     @Test(dataProvider = "pcapFiles")
-    public void testRead(String pcapFile) throws Exception {
+    public void testWriteAndRead(String pcapFile) throws Exception {
         FileEvents fileEvents = writeEvents(pcapFile);
         try {
             ArrayListEventHandler handler = new ArrayListEventHandler();
--- a/stress-tester/src/test/java/com/passus/st/utils/Assert.java	Thu Apr 09 13:49:19 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/utils/Assert.java	Fri Apr 10 10:22:01 2020 +0200
@@ -64,14 +64,19 @@
             Event event = events.get(i);
 
             assertEquals(expectedEvent.getType(), event.getType());
-            if (event.getType() == SessionPayloadEvent.TYPE && ((SessionPayloadEvent) event).getProtocolId() == HTTP) {
+            if (event.getType() == SessionPayloadEvent.TYPE) {
                 SessionPayloadEvent expectedPayloadEvent = (SessionPayloadEvent) expectedEvent;
                 SessionPayloadEvent payloadEvent = (SessionPayloadEvent) event;
 
-                assertMessages(expectedPayloadEvent.getRequest(), payloadEvent.getRequest());
-                assertMessagesContent(expectedPayloadEvent.getRequest(), payloadEvent.getRequest());
-                assertMessages(expectedPayloadEvent.getResponse(), payloadEvent.getResponse());
-                assertMessagesContent(expectedPayloadEvent.getResponse(), payloadEvent.getResponse());
+                if(((SessionPayloadEvent) event).getProtocolId() == HTTP) {
+                    assertMessages(expectedPayloadEvent.getRequest(), payloadEvent.getRequest());
+                    assertMessagesContent(expectedPayloadEvent.getRequest(), payloadEvent.getRequest());
+                    assertMessages(expectedPayloadEvent.getResponse(), payloadEvent.getResponse());
+                    assertMessagesContent(expectedPayloadEvent.getResponse(), payloadEvent.getResponse());
+                } else {
+                    assertEquals(expectedPayloadEvent.getRequest(), payloadEvent.getRequest());
+                    assertEquals(expectedPayloadEvent.getResponse(), payloadEvent.getResponse());
+                }
             }
         }
     }