Mercurial > stress-tester
changeset 975:1d6458a1c1da
NioAbstractEmitterWorker - unidirectional traffic support
author | Devel 2 |
---|---|
date | Thu, 25 Jul 2019 10:20:36 +0200 |
parents | e3532e4a84fe |
children | aff81768741e |
files | stress-tester/src/main/java/com/passus/st/CliHelper.java stress-tester/src/main/java/com/passus/st/Protocols.java stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java |
diffstat | 6 files changed, 31 insertions(+), 18 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/CliHelper.java Wed Jul 24 14:43:40 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/CliHelper.java Thu Jul 25 10:20:36 2019 +0200 @@ -153,7 +153,7 @@ executor.setEmitter(emitter); executor.setCollectMetrics(true); executor.setConnectPartialSession(cl.hasOption("ps")); - executor.setWokerType(cl.getOptionValue("wt", "synch")); + executor.setWorkerType(cl.getOptionValue("wt", "synch")); //TODO Wyprowadzic parametr /*if (cl.hasOption("rs")) {
--- a/stress-tester/src/main/java/com/passus/st/Protocols.java Wed Jul 24 14:43:40 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/Protocols.java Thu Jul 25 10:20:36 2019 +0200 @@ -17,7 +17,7 @@ case DNS: return "DNS"; case NETFLOW: - return "NETFLOW"; + return "Netflow"; default: return "unknown"; }
--- a/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java Wed Jul 24 14:43:40 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowExecutor.java Thu Jul 25 10:20:36 2019 +0200 @@ -47,7 +47,7 @@ //private float sleepFactor = FlowWorker.SLEEP_FACTOR_NO_SLEEP; - private String wokerType = "synch"; + private String workerType = "synch"; private int workersNum = DEFAULT_WORKERS_NUM; @@ -88,18 +88,18 @@ filters.forEach((filter) -> filterChain.addFilter(filter)); } - public String getWokerType() { - return wokerType; + public String getWorkerType() { + return workerType; } - public void setWokerType(String wokerType) { - Assert.notNull(wokerType, "wokerType"); + public void setWorkerType(String workerType) { + Assert.notNull(workerType, "workerType"); - if (!FlowWorkerFactory.getInstance().containsName(wokerType)) { - throw new IllegalArgumentException("Unknwon worker type '" + wokerType + "'."); + if (!FlowWorkerFactory.getInstance().containsName(workerType)) { + throw new IllegalArgumentException("Unknown worker type '" + workerType + "'."); } - this.wokerType = wokerType; + this.workerType = workerType; } public int getWorkersNum() { @@ -231,9 +231,9 @@ for (int i = 0; i < workersNum; i++) { Object[] initArgs = {emitter, threadName, i}; - FlowWorker worker = factory.getInstanceByName(wokerType, parameterTypes, initArgs); + FlowWorker worker = factory.getInstanceByName(workerType, parameterTypes, initArgs); if (worker == null) { - throw new ServiceException("Unable to create instance of worker '" + wokerType + "'"); + throw new ServiceException("Unable to create instance of worker '" + workerType + "'"); } worker.setListener(listener);
--- a/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Wed Jul 24 14:43:40 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/FlowWorkerBase.java Thu Jul 25 10:20:36 2019 +0200 @@ -451,6 +451,9 @@ } flowContext.client().onDataWriteEnd(flowContext); + if (!flowContext.isBidirectional()) { + changeFlowState(flowContext, STATE_RESP_RECEIVED); + } } lock.notifyAll();
--- a/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java Wed Jul 24 14:43:40 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java Thu Jul 25 10:20:36 2019 +0200 @@ -2,6 +2,8 @@ import com.passus.st.emitter.SessionInfo; +import static com.passus.st.Protocols.protocolToString; + /** * @author Mirosław Hawrot */ @@ -43,4 +45,13 @@ public SessionPayloadEvent instanceForWorker(int index) { return new SessionPayloadEvent(getSessionInfo(), request, response, protocolId, getSourceName()); } + + @Override + public String toString() { + return "SessionPayloadEvent{" + + "request=" + request + + ", response=" + response + + ", protocol=" + protocolToString(protocolId) + + '}'; + } }
--- a/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java Wed Jul 24 14:43:40 2019 +0200 +++ b/stress-tester/src/main/java/com/passus/st/emitter/nio/NioAbstractEmitterWorker.java Thu Jul 25 10:20:36 2019 +0200 @@ -297,6 +297,11 @@ keyContext.channelContext.getLocalAddress(), keyContext.channelContext.getRemoteAddress()); } + if (channelContext.isBidirectional()) { + setOpRead(key); + clearOpWrite(key); + } + //TODO Operacje na handlerach powinny przechodzic przez Executor try { keyContext.handler.dataWritten(keyContext.channelContext); @@ -304,12 +309,6 @@ } catch (Exception e) { logger.debug(e.getMessage(), e); } - - if (channelContext.isBidirectional()) { - setOpRead(key); - } - - clearOpWrite(key); } protected void doRead(SelectionKey key) {