changeset 539:40b568fc8bec

Many pcap files support
author Devel 2
date Tue, 12 Sep 2017 10:14:05 +0200
parents 83edfa7af445
children 55753289dafa
files stress-tester/src/main/java/com/passus/st/Main.java stress-tester/src/main/java/com/passus/st/client/AbstractEvent.java stress-tester/src/main/java/com/passus/st/client/DataEvents.java stress-tester/src/main/java/com/passus/st/client/Event.java stress-tester/src/main/java/com/passus/st/client/SessionEvent.java stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java stress-tester/src/main/java/com/passus/st/client/SessionStatusEvent.java stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerDispatcher.java stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java stress-tester/src/main/java/com/passus/st/client/http/HttpSourceNameAwareClientWorkerDispatcher.java stress-tester/src/main/java/com/passus/st/client/http/ModuloHttpClientWorkerDispatcher.java stress-tester/src/main/java/com/passus/st/emitter/DynamicSessionMapper.java stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java
diffstat 17 files changed, 180 insertions(+), 116 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/Main.java	Mon Sep 11 12:17:48 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/Main.java	Tue Sep 12 10:14:05 2017 +0200
@@ -13,6 +13,7 @@
 import com.passus.st.client.http.HttpClient;
 import com.passus.st.client.http.HttpReporterClientListener;
 import com.passus.st.client.http.HttpReporterMetricHandler;
+import com.passus.st.client.http.HttpSourceNameAwareClientWorkerDispatcher;
 import com.passus.st.client.http.SummaryHttpClientListener;
 import com.passus.st.client.http.WriterHttpClientListener;
 import com.passus.st.client.http.filter.HttpFiltersConfigurator;
@@ -176,8 +177,8 @@
         try {
             CommandLine cl = new DefaultParser().parse(options, args);
             String[] clArgs = cl.getArgs();
-            if (clArgs.length != 1) {
-                System.err.println("Pcap file required.");
+            if (clArgs.length < 1) {
+                System.err.println("At least one pcap file required.");
                 printHelp(options);
                 return;
             }
@@ -216,6 +217,9 @@
             client.setCollectMetrics(true);
             client.setConnectPartialSession(cl.hasOption("ps"));
             client.setWokerType(cl.getOptionValue("wt", "synch"));
+            client.setWorkersNum(clArgs.length);
+            client.setDispatcher(new HttpSourceNameAwareClientWorkerDispatcher());
+
             if (cl.hasOption("pr")) {
                 int parallelReplays = Integer.parseInt(cl.getOptionValue("pr"));
                 if (parallelReplays > 0 && parallelReplays <= 100) {
@@ -299,18 +303,26 @@
 
             client.start();
 
-            PcapSessionEventSource eventSrc = new PcapSessionEventSource();
-            eventSrc.setPcapFile(clArgs[0]);
-            eventSrc.setAllowPartialSession(cl.hasOption("ps"));
-            eventSrc.setCollectMetrics(true);
+            PcapSessionEventSource[] eventSrcs = new PcapSessionEventSource[clArgs.length];
+
+            for (int i = 0; i < clArgs.length; i++) {
+                PcapSessionEventSource eventSrc = new PcapSessionEventSource();
+                eventSrc.setPcapFile(clArgs[i]);
+                eventSrc.setAllowPartialSession(cl.hasOption("ps"));
+                eventSrc.setCollectMetrics(true);
+                eventSrcs[i] = eventSrc;
+            }
 
             if (cl.hasOption("hp")) {
-                PortRangeSet portsRanges = eventSrc.getPortsRange();
-                portsRanges.clear();
+                for (int i = 0; i < clArgs.length; i++) {
+                    PcapSessionEventSource eventSrc = eventSrcs[i];
+                    PortRangeSet portsRanges = eventSrc.getPortsRange();
+                    portsRanges.clear();
 
-                String[] ports = cl.getOptionValues("hp");
-                for (String port : ports) {
-                    portsRanges.add(port);
+                    String[] ports = cl.getOptionValues("hp");
+                    for (String port : ports) {
+                        portsRanges.add(port);
+                    }
                 }
             }
 
@@ -333,7 +345,10 @@
             }
 
             collector.addHandler(summMetricsHandler);
-            collector.register(eventSrc);
+            for (int i = 0; i < clArgs.length; i++) {
+                collector.register(eventSrcs[i]);
+            }
+
             collector.register(emitter);
             collector.register(client);
             collector.start();
@@ -341,15 +356,21 @@
             startTime = System.currentTimeMillis();
             if (cl.hasOption("ca")) {
                 MemoryEventsCache cache = new MemoryEventsCache(client);
-                eventSrc.setHandler(cache);
-                eventSrc.start();
+                for (int i = 0; i < clArgs.length; i++) {
+                    PcapSessionEventSource eventSrc = eventSrcs[i];
+                    eventSrc.setHandler(cache);
+                    eventSrc.start();
+                }
                 cache.setLoop(loops);
                 cache.await();
                 cache.send();
             } else {
-                eventSrc.setHandler(client);
-                eventSrc.setLoops(loops);
-                eventSrc.start();
+                for (int i = 0; i < clArgs.length; i++) {
+                    PcapSessionEventSource eventSrc = eventSrcs[i];
+                    eventSrc.setHandler(client);
+                    eventSrc.setLoops(loops);
+                    eventSrc.start();
+                }
             }
 
             ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
@@ -363,7 +384,10 @@
             collector.collect();
 
             executor.shutdownNow();
-            eventSrc.stop();
+            for (int i = 0; i < clArgs.length; i++) {
+                eventSrcs[i].stop();
+            }
+
             client.stop();
             emitter.stop();
             if (reporterClient != null) {
--- a/stress-tester/src/main/java/com/passus/st/client/AbstractEvent.java	Mon Sep 11 12:17:48 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/AbstractEvent.java	Tue Sep 12 10:14:05 2017 +0200
@@ -12,9 +12,12 @@
 
     private final int type;
 
-    public AbstractEvent(int type) {
+    protected final String sourceName;
+
+    public AbstractEvent(int type, String sourceName) {
         timestamp = TimeGenerator.getDefaultGenerator().currentTimeMillis();
         this.type = type;
+        this.sourceName = sourceName;
     }
 
     @Override
@@ -32,4 +35,9 @@
         return type;
     }
 
+    @Override
+    public String getSourceName() {
+        return sourceName;
+    }
+
 }
--- a/stress-tester/src/main/java/com/passus/st/client/DataEvents.java	Mon Sep 11 12:17:48 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/DataEvents.java	Tue Sep 12 10:14:05 2017 +0200
@@ -13,19 +13,12 @@
 
         public static final int TYPE = 2;
 
-        private final String sourceName;
-
         public DataEnd() {
             this("");
         }
 
         public DataEnd(String sourceName) {
-            super(TYPE);
-            this.sourceName = sourceName;
-        }
-
-        public String getSourceName() {
-            return sourceName;
+            super(TYPE, sourceName);
         }
 
         @Override
@@ -42,26 +35,18 @@
 
         public static final int LOOP_NUM_UNDEFINED = -1;
 
-        private final String sourceName;
-
         private int loopNum;
 
         public DataLoopEnd(String sourceName) {
-            super(TYPE);
-            this.sourceName = sourceName;
+            super(TYPE, sourceName);
             this.loopNum = LOOP_NUM_UNDEFINED;
         }
 
         public DataLoopEnd(String sourceName, int loopNum) {
-            super(TYPE);
-            this.sourceName = sourceName;
+            super(TYPE, sourceName);
             this.loopNum = loopNum;
         }
 
-        public String getSourceName() {
-            return sourceName;
-        }
-
         public int getLoopNum() {
             return loopNum;
         }
--- a/stress-tester/src/main/java/com/passus/st/client/Event.java	Mon Sep 11 12:17:48 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/Event.java	Tue Sep 12 10:14:05 2017 +0200
@@ -7,9 +7,11 @@
 public interface Event {
 
     public int getType();
-    
+
     public long getTimestamp();
-    
+
     public void setTimestamp(long timestamp);
-    
+
+    public String getSourceName();
+
 }
--- a/stress-tester/src/main/java/com/passus/st/client/SessionEvent.java	Mon Sep 11 12:17:48 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SessionEvent.java	Tue Sep 12 10:14:05 2017 +0200
@@ -11,13 +11,16 @@
 
     private final SessionInfo sessionInfo;
 
+    private final String sourceName;
+
     private long timestamp;
 
-    public SessionEvent(SessionInfo sessionInfo) {
+    public SessionEvent(SessionInfo sessionInfo, String sourceName) {
         this.sessionInfo = sessionInfo;
         timestamp = TimeGenerator.getDefaultGenerator().currentTimeMillis();
+        this.sourceName = sourceName;
     }
-    
+
     public SessionInfo getSessionInfo() {
         return sessionInfo;
     }
@@ -32,5 +35,10 @@
         this.timestamp = timestamp;
     }
 
+    @Override
+    public String getSourceName() {
+        return sourceName;
+    }
+
     public abstract SessionEvent instanceForWorker(int index);
 }
--- a/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java	Mon Sep 11 12:17:48 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java	Tue Sep 12 10:14:05 2017 +0200
@@ -10,8 +10,8 @@
 
     private final T payload;
 
-    public SessionPayloadEvent(SessionInfo sessionInfo, T payload) {
-        super(sessionInfo);
+    public SessionPayloadEvent(SessionInfo sessionInfo, T payload, String sourceName) {
+        super(sessionInfo, sourceName);
         this.payload = payload;
     }
 
--- a/stress-tester/src/main/java/com/passus/st/client/SessionStatusEvent.java	Mon Sep 11 12:17:48 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SessionStatusEvent.java	Tue Sep 12 10:14:05 2017 +0200
@@ -18,12 +18,16 @@
     private final int status;
 
     private SessionStatusEvent() {
-        super(null);
+        super(null, null);
         status = 0;
     }
 
     public SessionStatusEvent(SessionInfo sessionInfo, int status) {
-        super(sessionInfo);
+        this(sessionInfo, status, null);
+    }
+    
+    public SessionStatusEvent(SessionInfo sessionInfo, int status, String sourceName) {
+        super(sessionInfo, sourceName);
 
         if (status < STATUS_OPENING || status > STATUS_CLOSED) {
             throw new IllegalArgumentException("Invalid status '" + status + "'.");
@@ -66,7 +70,7 @@
 
     @Override
     public SessionStatusEvent instanceForWorker(int index) {
-        return new SessionStatusEvent(getSessionInfo(), status);
+        return new SessionStatusEvent(getSessionInfo(), status, getSourceName());
     }
 
 }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java	Mon Sep 11 12:17:48 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpAsynchClientWorker.java	Tue Sep 12 10:14:05 2017 +0200
@@ -179,7 +179,7 @@
                     HttpResponse resp = payloadEvent.getResponse();
                     HttpRequest req = payloadEvent.getRequest();
                     long respTime = time + (resp.getTimestamp() - req.getTimestamp());
-                    HttpResponseEvent respEvent = new HttpResponseEvent(session, resp, respTime);
+                    HttpResponseEvent respEvent = new HttpResponseEvent(session, event.getSourceName(), resp, respTime);
 
                     task.events.add(respEvent);
                 }
@@ -366,33 +366,12 @@
         }
     }
 
-    private static final class HttpRequestEvent extends SessionPayloadEvent<HttpRequest> {
-
-        public static final int TYPE = 1011;
-
-        public HttpRequestEvent(SessionInfo sessionInfo, HttpRequest payload, long timestamp) {
-            super(sessionInfo, payload);
-            setTimestamp(timestamp);
-        }
-
-        @Override
-        public SessionEvent instanceForWorker(int index) {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-
-        @Override
-        public int getType() {
-            return TYPE;
-        }
-
-    }
-
     private static final class HttpResponseEvent extends SessionPayloadEvent<HttpResponse> {
 
         public static final int TYPE = 1012;
 
-        public HttpResponseEvent(SessionInfo sessionInfo, HttpResponse payload, long timestamp) {
-            super(sessionInfo, payload);
+        public HttpResponseEvent(SessionInfo sessionInfo, String sourceName, HttpResponse payload, long timestamp) {
+            super(sessionInfo, payload, sourceName);
             setTimestamp(timestamp);
         }
 
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java	Mon Sep 11 12:17:48 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClient.java	Tue Sep 12 10:14:05 2017 +0200
@@ -45,6 +45,8 @@
 
     private boolean connectPartialSession = false;
 
+    private HttpClientWorkerDispatcher dispatcher;
+
     public HttpClient(Emitter emitter) {
         Assert.notNull(emitter, "emitter");
         this.emitter = emitter;
@@ -100,6 +102,14 @@
         this.sleepFactor = sleepFactor;
     }
 
+    public HttpClientWorkerDispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setDispatcher(HttpClientWorkerDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
     public int getActiveConnections() {
         int count = 0;
         for (HttpClientWorker worker : workers) {
@@ -238,9 +248,21 @@
             LOGGER.trace("Event: {}", event);
         }
 
-        for (HttpClientWorker worker : workers) {
-            worker.handle(event);
+        try {
+            if (dispatcher == null) {
+                for (HttpClientWorker worker : workers) {
+                    worker.handle(event);
+                }
+            } else {
+                int index = dispatcher.dispatch(event, workers);
+                workers[index].handle(event);
+            }
+        } catch (Exception e) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug(e.getMessage(), e);
+            }
         }
+
     }
 
 }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerDispatcher.java	Mon Sep 11 12:17:48 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpClientWorkerDispatcher.java	Tue Sep 12 10:14:05 2017 +0200
@@ -1,16 +1,15 @@
 package com.passus.st.client.http;
 
-import com.passus.st.client.SessionEvent;
+import com.passus.st.client.Event;
 
 /**
  *
  * @author Mirosław Hawrot
  */
-@Deprecated
 public interface HttpClientWorkerDispatcher {
     
-    public abstract HttpClientWorker find(SessionEvent event, HttpClientWorker[] workers);
+    public abstract HttpClientWorker find(Event event, HttpClientWorker[] workers);
 
-    public abstract int dispatch(SessionEvent event, HttpClientWorker[] workers);
+    public abstract int dispatch(Event event, HttpClientWorker[] workers);
     
 }
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java	Mon Sep 11 12:17:48 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpFlowBasedClientWorker.java	Tue Sep 12 10:14:05 2017 +0200
@@ -7,7 +7,6 @@
 import com.passus.data.DataDecoder;
 import com.passus.data.HeapByteBuff;
 import com.passus.net.http.HttpFullMessageDecoder;
-import com.passus.net.http.HttpMethod;
 import com.passus.net.http.HttpRequest;
 import com.passus.net.http.HttpRequestEncoder;
 import com.passus.net.http.HttpResponse;
--- a/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java	Mon Sep 11 12:17:48 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSessionPayloadEvent.java	Tue Sep 12 10:14:05 2017 +0200
@@ -15,12 +15,12 @@
 
     public static final int TYPE = 12;
 
-    public HttpSessionPayloadEvent(SessionInfo sessionInfo, HttpRequest req, HttpResponse resp) {
-        this(sessionInfo, new HttpReqResp(req, resp));
+    public HttpSessionPayloadEvent(SessionInfo sessionInfo, HttpRequest req, HttpResponse resp, String sourceName) {
+        this(sessionInfo, new HttpReqResp(req, resp), sourceName);
     }
 
-    public HttpSessionPayloadEvent(SessionInfo sessionInfo, HttpReqResp payload) {
-        super(sessionInfo, payload);
+    public HttpSessionPayloadEvent(SessionInfo sessionInfo, HttpReqResp payload, String sourceName) {
+        super(sessionInfo, payload, sourceName);
     }
 
     public HttpRequest getRequest() {
@@ -40,7 +40,7 @@
     public HttpSessionPayloadEvent instanceForWorker(int index) {
         HttpReqResp payload = getPayload();
         HttpRequest reqCopy = payload.request == null ? null : new HttpRequest(payload.request);
-        return new HttpSessionPayloadEvent(getSessionInfo(), reqCopy, payload.response);
+        return new HttpSessionPayloadEvent(getSessionInfo(), reqCopy, payload.response, getSourceName());
     }
 
     public static class HttpReqResp {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/client/http/HttpSourceNameAwareClientWorkerDispatcher.java	Tue Sep 12 10:14:05 2017 +0200
@@ -0,0 +1,41 @@
+package com.passus.st.client.http;
+
+import com.passus.st.client.Event;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ * @author Mirosław Hawrot
+ */
+public class HttpSourceNameAwareClientWorkerDispatcher implements HttpClientWorkerDispatcher {
+
+    private final List<String> sources = new ArrayList<>(4);
+
+    @Override
+    public HttpClientWorker find(Event event, HttpClientWorker[] workers) {
+        int index = dispatch(event, workers);
+        return workers[index];
+    }
+
+    @Override
+    public int dispatch(Event event, HttpClientWorker[] workers) {
+        String sourceName = event.getSourceName();
+        if (sourceName == null) {
+            throw new IllegalArgumentException("Event '" + event.getClass() + "' source name cannot be null.");
+        }
+
+        int index = sources.indexOf(sourceName);
+        if (index == -1) {
+            if (sources.size() == workers.length) {
+                throw new IllegalArgumentException("Too low workers number.");
+            }
+
+            sources.add(sourceName);
+            index = sources.size() - 1;
+        }
+
+        return index;
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/client/http/ModuloHttpClientWorkerDispatcher.java	Mon Sep 11 12:17:48 2017 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,24 +0,0 @@
-package com.passus.st.client.http;
-
-import com.passus.st.client.SessionEvent;
-
-/**
- *
- * @author Mirosław Hawrot
- */
-@Deprecated
-public class ModuloHttpClientWorkerDispatcher implements HttpClientWorkerDispatcher {
-
-    @Override
-    public HttpClientWorker find(SessionEvent event, HttpClientWorker[] workers) {
-        int index = dispatch(event, workers);
-        return workers[index];
-    }
-
-    @Override
-    public int dispatch(SessionEvent event, HttpClientWorker[] workers) {
-        int hashCode = (event.getSessionInfo().hashCode()) & 0x7fffffff;
-        return hashCode % workers.length;
-    }
-
-}
--- a/stress-tester/src/main/java/com/passus/st/emitter/DynamicSessionMapper.java	Mon Sep 11 12:17:48 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/DynamicSessionMapper.java	Tue Sep 12 10:14:05 2017 +0200
@@ -1,7 +1,6 @@
 package com.passus.st.emitter;
 
 import com.passus.net.IpAddress;
-import com.passus.net.SocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
--- a/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Mon Sep 11 12:17:48 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/SessionInfo.java	Tue Sep 12 10:14:05 2017 +0200
@@ -6,6 +6,7 @@
 import com.passus.net.session.SessionBean;
 import com.passus.net.session.SessionUtils;
 import java.text.ParseException;
+import java.util.Objects;
 import org.apache.commons.lang3.StringUtils;
 
 /**
@@ -32,6 +33,8 @@
 
     private final int hashCode;
 
+    private String sourceName;
+
     public SessionInfo(String srcSocket, String dstSocket) throws ParseException {
         this(new SocketAddress(srcSocket), new SocketAddress(dstSocket));
     }
@@ -67,6 +70,14 @@
         this.hashCode = (srcPort + dstPort << 16) ^ srcIp.hashCode() ^ dstIp.hashCode();
     }
 
+    public String getSourceName() {
+        return sourceName;
+    }
+
+    public void setSourceName(String sourceName) {
+        this.sourceName = sourceName;
+    }
+
     public int getTransport() {
         return transport;
     }
@@ -110,7 +121,8 @@
         }
 
         final SessionInfo other = (SessionInfo) obj;
-        return transport == transport
+        return Objects.equals(sourceName, other.sourceName)
+                && transport == other.transport
                 && srcPort == other.srcPort && dstPort == other.dstPort
                 && srcIp.equals(other.srcIp) && dstIp.equals(other.dstIp);
     }
@@ -124,6 +136,10 @@
         sb.append("<->");
         sb.append(dstIp).append(":").append(dstPort);
 
+        if (sourceName != null) {
+            sb.append(" ").append(sourceName);
+        }
+
         return sb.toString();
     }
 
--- a/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java	Mon Sep 11 12:17:48 2017 +0200
+++ b/stress-tester/src/main/java/com/passus/st/source/PcapSessionEventSource.java	Tue Sep 12 10:14:05 2017 +0200
@@ -4,7 +4,7 @@
 import com.passus.commons.annotations.Plugin;
 import com.passus.commons.service.ServiceException;
 import com.passus.commons.time.CustomTimeGenerator;
-import com.passus.commons.utils.StringUtils;
+import com.passus.commons.utils.UniqueIdGenerator;
 import com.passus.config.Configuration;
 import com.passus.net.Frame;
 import com.passus.net.MemoryFrame;
@@ -91,7 +91,7 @@
     public PcapSessionEventSource() {
         portsRange.add(80);
         portsRange.add(8080);
-        this.name = StringUtils.randomString();
+        this.name = UniqueIdGenerator.generate();
     }
 
     @Override
@@ -528,8 +528,9 @@
                         context.getSrcIpAddr(), context.getSrcPort(),
                         context.getDstIpAddr(), context.getDstPort(),
                         context.getProtocol(), context.getId());
-
-                Event event = new HttpSessionPayloadEvent(info, req, (HttpResponse) message);
+                info.setSourceName(name);
+                
+                Event event = new HttpSessionPayloadEvent(info, req, (HttpResponse) message, name);
                 event.setTimestamp(timestamp);
                 eventHandler.handle(event);
                 if (collectMetric) {
@@ -555,8 +556,9 @@
                     context.getSrcIpAddr(), context.getSrcPort(),
                     context.getDstIpAddr(), context.getDstPort(),
                     context.getProtocol(), context.getId());
-
-            Event event = new SessionStatusEvent(info, sessionInfoStatus);
+            info.setSourceName(name);
+            
+            Event event = new SessionStatusEvent(info, sessionInfoStatus, name);
             event.setTimestamp(timestamp);
             eventHandler.handle(event);
         }
@@ -570,7 +572,7 @@
         private final Ip ip;
 
         public IpEvent(Ip ip) {
-            super(TYPE);
+            super(TYPE, null);
             this.ip = ip;
         }