changeset 975:1d6458a1c1da

NioAbstractEmitterWorker - unidirectional traffic support
author Devel 2
date Thu, 25 Jul 2019 10:20:36 +0200
parents e3532e4a84fe
children aff81768741e
files stress-tester/src/main/java/com/passus/st/CliHelper.java stress-tester/src/main/java/com/passus/st/Protocols.java stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java
diffstat 6 files changed, 31 insertions(+), 18 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/CliHelper.java	Wed Jul 24 14:43:40 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/CliHelper.java	Thu Jul 25 10:20:36 2019 +0200
@@ -153,7 +153,7 @@
         executor.setEmitter(emitter);
         executor.setCollectMetrics(true);
         executor.setConnectPartialSession(cl.hasOption("ps"));
-        executor.setWokerType(cl.getOptionValue("wt", "synch"));
+        executor.setWorkerType(cl.getOptionValue("wt", "synch"));
 
         //TODO Wyprowadzic parametr
         /*if (cl.hasOption("rs")) {
--- a/stress-tester/src/main/java/com/passus/st/Protocols.java	Wed Jul 24 14:43:40 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/Protocols.java	Thu Jul 25 10:20:36 2019 +0200
@@ -17,7 +17,7 @@
             case DNS:
                 return "DNS";
             case NETFLOW:
-                return "NETFLOW";
+                return "Netflow";
             default:
                 return "unknown";
         }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java	Wed Jul 24 14:43:40 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java	Thu Jul 25 10:20:36 2019 +0200
@@ -47,7 +47,7 @@
 
     //private float sleepFactor = FlowWorker.SLEEP_FACTOR_NO_SLEEP;
 
-    private String wokerType = "synch";
+    private String workerType = "synch";
 
     private int workersNum = DEFAULT_WORKERS_NUM;
 
@@ -88,18 +88,18 @@
         filters.forEach((filter) -> filterChain.addFilter(filter));
     }
 
-    public String getWokerType() {
-        return wokerType;
+    public String getWorkerType() {
+        return workerType;
     }
 
-    public void setWokerType(String wokerType) {
-        Assert.notNull(wokerType, "wokerType");
+    public void setWorkerType(String workerType) {
+        Assert.notNull(workerType, "workerType");
 
-        if (!FlowWorkerFactory.getInstance().containsName(wokerType)) {
-            throw new IllegalArgumentException("Unknwon worker type '" + wokerType + "'.");
+        if (!FlowWorkerFactory.getInstance().containsName(workerType)) {
+            throw new IllegalArgumentException("Unknown worker type '" + workerType + "'.");
         }
 
-        this.wokerType = wokerType;
+        this.workerType = workerType;
     }
 
     public int getWorkersNum() {
@@ -231,9 +231,9 @@
 
         for (int i = 0; i < workersNum; i++) {
             Object[] initArgs = {emitter, threadName, i};
-            FlowWorker worker = factory.getInstanceByName(wokerType, parameterTypes, initArgs);
+            FlowWorker worker = factory.getInstanceByName(workerType, parameterTypes, initArgs);
             if (worker == null) {
-                throw new ServiceException("Unable to create instance of worker '" + wokerType + "'");
+                throw new ServiceException("Unable to create instance of worker '" + workerType + "'");
             }
 
             worker.setListener(listener);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Wed Jul 24 14:43:40 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java	Thu Jul 25 10:20:36 2019 +0200
@@ -451,6 +451,9 @@
                 }
 
                 flowContext.client().onDataWriteEnd(flowContext);
+                if (!flowContext.isBidirectional()) {
+                    changeFlowState(flowContext, STATE_RESP_RECEIVED);
+                }
             }
 
             lock.notifyAll();
--- a/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java	Wed Jul 24 14:43:40 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java	Thu Jul 25 10:20:36 2019 +0200
@@ -2,6 +2,8 @@
 
 import com.passus.st.emitter.SessionInfo;
 
+import static com.passus.st.Protocols.protocolToString;
+
 /**
  * @author Mirosław Hawrot
  */
@@ -43,4 +45,13 @@
     public SessionPayloadEvent instanceForWorker(int index) {
         return new SessionPayloadEvent(getSessionInfo(), request, response, protocolId, getSourceName());
     }
+
+    @Override
+    public String toString() {
+        return "SessionPayloadEvent{" +
+                "request=" + request +
+                ", response=" + response +
+                ", protocol=" + protocolToString(protocolId) +
+                '}';
+    }
 }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Wed Jul 24 14:43:40 2019 +0200
+++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java	Thu Jul 25 10:20:36 2019 +0200
@@ -297,6 +297,11 @@
                     keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress());
         }
 
+        if (channelContext.isBidirectional()) {
+            setOpRead(key);
+            clearOpWrite(key);
+        }
+
         //TODO Operacje na handlerach powinny przechodzic przez Executor
         try {
             keyContext.handler.dataWritten(keyContext.channelContext);
@@ -304,12 +309,6 @@
         } catch (Exception e) {
             logger.debug(e.getMessage(), e);
         }
-
-        if (channelContext.isBidirectional()) {
-            setOpRead(key);
-        }
-
-        clearOpWrite(key);
     }
 
     protected void doRead(SelectionKey key) {