changeset 1138:730898c04e80

SequenceFilterX -> SequenceFilter
author Devel 2
date Tue, 09 Jun 2020 12:38:40 +0200
parents 49969086dded
children 04d179a5e2bf
files stress-tester/src/main/java/com/passus/st/filter/DefaultFlowFilterFactory.java stress-tester/src/main/java/com/passus/st/filter/SequenceFilter.java stress-tester/src/main/java/com/passus/st/filter/SequenceFilterActionNodeTransformer.java stress-tester/src/main/java/com/passus/st/filter/SequenceFilterStageNodeTransformer.java stress-tester/src/main/java/com/passus/st/filter/SequenceFilterValuesNodeTransformer.java stress-tester/src/main/java/com/passus/st/filter/SequenceFilterX.java stress-tester/src/main/java/com/passus/st/filter/SequenceFilterXActionNodeTransformer.java stress-tester/src/main/java/com/passus/st/filter/SequenceFilterXStageNodeTransformer.java stress-tester/src/test/java/com/passus/st/filter/SequenceFilterTest.java stress-tester/src/test/resources/com/passus/st/client/http/filter/sequence.yml
diffstat 10 files changed, 743 insertions(+), 1039 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/filter/DefaultFlowFilterFactory.java	Tue Jun 09 12:00:22 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/filter/DefaultFlowFilterFactory.java	Tue Jun 09 12:38:40 2020 +0200
@@ -44,7 +44,11 @@
         HttpFilterResponseWrapper origResp;
         if (context != null) {
             httpContext = context.getParamValue(HttpFlowConst.PARAM_HTTP_CONTEXT);
-            origResp = (HttpFilterResponseWrapper) createHttpMessageWrapper(httpContext.origResponse());
+            if (httpContext != null) {
+                origResp = (HttpFilterResponseWrapper) createHttpMessageWrapper(httpContext.origResponse());
+            } else {
+                origResp = null;
+            }
         } else {
             httpContext = null;
             origResp = null;
--- a/stress-tester/src/main/java/com/passus/st/filter/SequenceFilter.java	Tue Jun 09 12:00:22 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/filter/SequenceFilter.java	Tue Jun 09 12:38:40 2020 +0200
@@ -3,231 +3,55 @@
 import com.passus.commons.Assert;
 import com.passus.commons.annotations.Plugin;
 import com.passus.commons.metric.MapMetric;
-import com.passus.commons.service.Registry;
+import com.passus.commons.metric.Metric;
 import com.passus.commons.time.SystemTimeGenerator;
 import com.passus.commons.time.TimeAware;
 import com.passus.commons.time.TimeGenerator;
-import com.passus.config.*;
+import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
 import com.passus.config.annotations.NodeDefinitionCreate;
 import com.passus.config.schema.*;
-import com.passus.config.validation.Errors;
 import com.passus.filter.ValueExtractor;
-import com.passus.filter.ValueExtractorParser;
-import com.passus.filter.config.PredicateNodeTransformer;
 import com.passus.st.client.FlowContext;
-import com.passus.st.client.http.ReporterDestination;
+import com.passus.st.client.SessionPayloadEvent;
+import com.passus.st.client.http.filter.HttpFilterRequestWrapper;
+import com.passus.st.client.http.filter.HttpFilterResponseWrapper;
+import com.passus.st.config.HeaderOperationNodeDefinition;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.metric.MetricSource;
+import com.passus.st.metric.MetricsContainer;
 import com.passus.st.plugin.PluginConstants;
 
 import java.io.Serializable;
-import java.text.ParseException;
 import java.util.*;
 import java.util.function.Predicate;
 
 import static com.passus.config.schema.ConfigurationSchemaBuilder.*;
 import static com.passus.st.config.CommonNodeDefs.*;
 
-/**
- * @author mikolaj.podbielski
- */
 @NodeDefinitionCreate(SequenceFilter.NodeDefCreator.class)
 @Plugin(name = SequenceFilter.TYPE, category = PluginConstants.CATEGORY_FLOW_FILTER)
-public class SequenceFilter implements FlowFilter, TimeAware {
-
-    public static class SequenceItem {
-
-        private final Predicate predicate;
-        private boolean mustOccur = true;
-        private long time = 10_000;
-        private int num;
-        private String alias;
-        private String[] aliases;
-
-        public SequenceItem(Predicate predicate, int num) {
-            this(predicate);
-            this.num = num;
-        }
-
-        public SequenceItem(Predicate predicate) {
-            if (predicate == null) {
-                throw new NullPointerException();
-            }
-            this.predicate = predicate;
-        }
-
-        public Predicate getPredicate() {
-            return predicate;
-        }
-
-        public int getNum() {
-            return num;
-        }
-
-        public void setNum(int num) {
-            this.num = num;
-        }
-
-        public long getTime() {
-            return time;
-        }
-
-        public void setTime(long time) {
-            if (time <= 0) {
-                throw new IllegalArgumentException("Time must be greater than zero.");
-            }
-            this.time = time;
-        }
-
-        public String getAlias() {
-            return alias;
-        }
-
-        public void setAlias(String alias) {
-            this.alias = alias;
-            aliases = null;
-        }
-
-        public boolean isMustOccur() {
-            return mustOccur;
-        }
-
-        public void setMustOccur(boolean mustOccur) {
-            this.mustOccur = mustOccur;
-        }
-
-        private boolean match(Map<String, Object> value) {
-            return predicate.test(value);
-        }
-
-        private String[] getAliases() {
-            if (aliases == null) {
-                if (alias != null) {
-                    aliases = new String[2];
-                    aliases[1] = alias;
-                } else {
-                    aliases = new String[1];
-                }
-
-                aliases[0] = "_i" + num;
-            }
-
-            return aliases;
-        }
-    }
-
-    private static class SeqChainItem {
-
-        private final SequenceItem seq;
-        private SeqChainItem next;
-        private final long startTime;
-        private final long endTime;
-        private boolean active = true;
-
-        public SeqChainItem(SequenceItem seq, long startTime) {
-            this.seq = seq;
-            this.startTime = startTime;
-            this.endTime = startTime + seq.time;
-        }
-
-        public void setNext(SeqChainItem next) {
-            this.next = next;
-        }
-
-        public long getStartTime() {
-            return startTime;
-        }
-
-        public long getEndTime() {
-            return endTime;
-        }
-
-        public boolean match(Map<String, Object> value) {
-            return seq.match(value);
-        }
-
-        public SeqChainItem getNext() {
-            return next;
-        }
-
-        public boolean inTimeRange(long time) {
-            return (startTime <= time && time < endTime);
-        }
-    }
-
-    private static class SeqChain {
-
-        private SeqChainItem seqItem;
-
-        private final Map<String, Object> valueMap;
-//        private Filterable value = new FilterableMap();
-//        private FlowMarker flowMarker;
-
-        public SeqChain(SequenceItem[] seqItems, long startTime, Map<String, Object> valueMap) {
-            this.valueMap = valueMap;
-
-            SeqChainItem curItem = new SeqChainItem(seqItems[1], startTime);
-            seqItem = curItem;
-
-            for (int i = 2; i < seqItems.length; i++) {
-                SeqChainItem nexItem = new SeqChainItem(seqItems[i], curItem.getStartTime());
-                curItem.setNext(nexItem);
-                curItem = nexItem;
-            }
-        }
-
-        public void updateValue(MessageWrapper wrapper) {
-            valueMap.put("req", wrapper.getReq());
-            valueMap.put("resp", wrapper.getResp());
-        }
-
-        public void persistsValue(MessageWrapper wrapper, String[] aliases) {
-            for (String alias : aliases) {
-                valueMap.put(alias, wrapper);
-            }
-        }
-
-        public boolean hasNext() {
-            return (seqItem != null && seqItem.getNext() != null);
-        }
-
-        public boolean next() {
-            if (seqItem == null) {
-                return false;
-            }
-
-            seqItem = seqItem.getNext();
-            return true;
-        }
-
-        public boolean rewind(long time) {
-            if (seqItem == null) {
-                return false;
-            }
-
-            while (hasNext()) {
-                if (seqItem.inTimeRange(time)) {
-                    return true;
-                } else if (seqItem.seq.mustOccur) {
-                    return false;
-                }
-
-                next();
-            }
-
-            return false;
-        }
-    }
+public class SequenceFilter implements FlowFilter, TimeAware, MetricSource {
 
     public static final String TYPE = "sequence";
 
-    private final List<SeqChain> chains = new ArrayList<>();
-    private SequenceListener listener = (e) -> {
-    };
-    private SequenceItem[] seqItems;
-    private Map<String, ValueExtractor> values = Collections.EMPTY_MAP;
+    private static final int DEFAULT_CHAINS_NUM = 32;
+
+    private static final String STAGE_PREFIX = "_i";
+
+    private TimeGenerator timeGenerator = new SystemTimeGenerator();
+
+    private final List<StageChain> chains = new ArrayList<>(DEFAULT_CHAINS_NUM);
 
     private final FlowFilterFactory filterFactory;
 
-    private TimeGenerator timeGenerator = new SystemTimeGenerator();
+    private Stage rootStage;
+
+    private FilterDirection direction = FilterDirection.IN;
+
+    private Action[] actions;
+
+    private List<Metric> metrics;
 
     public SequenceFilter() {
         this(FlowFilterFactory.DEFAULT_FACTORY);
@@ -238,28 +62,64 @@
         this.filterFactory = filterFactory;
     }
 
-    public SequenceListener getListener() {
-        return listener;
-    }
+    public void init(final Stage[] stages, final Action[] actions) {
+        Assert.notNull(stages, "stages");
+        Assert.notNull(actions, "actions");
+        if (stages.length < 2) {
+            throw new IllegalArgumentException("At least two Stage required.");
+        } else if (stages.length == 0) {
+            throw new IllegalArgumentException("At least one Action required.");
+        }
 
-    public void setListener(SequenceListener listener) {
-        this.listener = listener;
+        Stage lastStage = null;
+        Stage rootStage = null;
+        for (int i = 0; i < stages.length; i++) {
+            if (stages[i] == null) {
+                throw new NullPointerException("One Stage is null.");
+            }
+
+            Stage stage = stages[i];
+            stage.index(i);
+            if (i == 0) {
+                rootStage = stage;
+            } else if (lastStage != null) {
+                lastStage.next = stage;
+            }
+
+            lastStage = stage;
+        }
+
+        for (Action action : actions) {
+            if (action instanceof CreateMetricsAction) {
+                if (metrics == null) {
+                    metrics = new ArrayList<>();
+                }
+
+                ((CreateMetricsAction) action).metrics = metrics;
+            } else if (action instanceof MultiplicationAction) {
+                ((MultiplicationAction) action).stagesNum = stages.length;
+            }
+        }
+
+        this.rootStage = rootStage;
+        this.actions = actions;
     }
 
-    public SequenceItem[] getSeqItems() {
-        return seqItems;
+    public FilterDirection getDirection() {
+        return direction;
     }
 
-    public void setSeqItems(SequenceItem[] seqItems) {
-        this.seqItems = seqItems;
+    public void setDirection(FilterDirection direction) {
+        Assert.notNull(direction, "direction");
+        this.direction = direction;
     }
 
-    public Map<String, ValueExtractor> getValues() {
-        return values;
+    Stage getRootStage() {
+        return rootStage;
     }
 
-    public void setValues(Map<String, ValueExtractor> values) {
-        this.values = values;
+    Action[] getActions() {
+        return actions;
     }
 
     @Override
@@ -268,215 +128,350 @@
     }
 
     @Override
-    public void setTimeGenerator(TimeGenerator timeGenerator) {
-        Assert.notNull(timeGenerator, "timeGenerator");
-        this.timeGenerator = timeGenerator;
+    public void setTimeGenerator(TimeGenerator generator) {
+        Assert.notNull(generator, "generator");
+        this.timeGenerator = generator;
     }
 
-    private void processValue(long now, MessageWrapper value) {
-        Iterator<SeqChain> iterator = chains.iterator();
-        while (iterator.hasNext()) {
-            SeqChain chain = iterator.next();
-
-            if (chain.seqItem.startTime <= now) {
-                if (!chain.seqItem.inTimeRange(now) && !chain.rewind(now)) {
-                    chain.persistsValue(value, chain.seqItem.seq.getAliases());
-                    fireEvent(chain);
-                    iterator.remove();
-                } else if (chain.seqItem.active) {
-                    chain.updateValue(value);
+    @Override
+    public boolean isCollectMetrics() {
+        return true;
+    }
 
-                    if (chain.seqItem.match(chain.valueMap)) {
-                        if (!chain.seqItem.seq.mustOccur) {
-                            iterator.remove();
+    @Override
+    public void setCollectMetrics(boolean collectMetrics) {
+
+    }
+
+    @Override
+    public void writeMetrics(MetricsContainer container) {
+        if (metrics != null) {
+            synchronized (metrics) {
+                for (Metric metric : metrics) {
+                    container.update(metric);
+                }
+                metrics.clear();
+            }
+        }
+    }
+
+    @Override
+    public void configure(Configuration config, ConfigurationContext context) {
+        List<Stage> stagesList = (List<Stage>) config.get("sequence");
+        Stage[] stagesArr = stagesList.toArray(new Stage[0]);
+
+
+        List<Action> actionsList = (List<Action>) config.get("action");
+        Action[] actionsArr = actionsList.toArray(new Action[0]);
+
+        init(stagesArr, actionsArr);
+    }
+
+    @Override
+    public FlowFilter instanceForWorker(int index) {
+        SequenceFilter filter = new SequenceFilter();
+        filter.rootStage = rootStage;
+        filter.actions = actions;
+        filter.direction = direction;
+        return filter;
+    }
+
+    private void fireAction(StageChain chain, FlowContext flowContext) {
+        for (Action action : actions) {
+            action.execute(chain, flowContext);
+        }
+    }
+
+    private boolean nextStage(StageChain chain, long now, MessageWrapper value, FlowContext context) {
+        if (value != null) {
+            chain.updateOldValue(value, chain.stage.getAliases());
+        }
+
+        if (chain.stage.hasNext()) {
+            chain.setStage(now, chain.stage.next);
+            return true;
+        }
+
+        fireAction(chain, context);
+        return false;
+    }
+
+    private void processValue(long now, MessageWrapper value, FlowContext context) {
+        Iterator<StageChain> it = chains.iterator();
+        if (it.hasNext()) {
+            StageChain chain = it.next();
+            for (; ; ) {
+                if (chain.expire < now) {
+                    if (chain.stage.mustOccur) {
+                        it.remove();
+                        continue;
+                    }
+
+                    if (!nextStage(chain, now, value, context)) {
+                        it.remove();
+                    } else {
+                        continue;
+                    }
+                } else if (chain.stage.mustOccur) {
+                    Map<String, Object> filterable = chain.mergeValue(value);
+                    if (chain.stage.match(filterable)) {
+                        if (!nextStage(chain, now, value, context)) {
+                            it.remove();
+                        }
+                    }
+                } else if (!nextStage(chain, now, null, context)) {
+                    it.remove();
+                }
+
+                if (!it.hasNext()) break;
+                chain = it.next();
+            }
+        }
+
+        final Map<String, Object> valueMap = new HashMap<>(2);
+        valueMap.put("req", value.getReq());
+        valueMap.put("resp", value.getResp());
+        if (rootStage.match(valueMap)) {
+            StageChain chain = new StageChain(valueMap);
+            chain.setStage(now, rootStage);
+            if (nextStage(chain, now, value, context)) {
+                chains.add(chain);
+            }
+        }
+    }
+
+    @Override
+    public int filterOutbound(Object req, Object resp, FlowContext context) {
+        if (direction == FilterDirection.BOTH || direction == FilterDirection.OUT) {
+            MessageWrapper wrapper = filterFactory.createWrapper(req, resp, context);
+            processValue(timeGenerator.currentTimeMillis(), wrapper, context);
+        }
+
+        return DUNNO;
+    }
+
+    @Override
+    public int filterInbound(Object req, Object resp, FlowContext context) {
+        if (direction == FilterDirection.BOTH || direction == FilterDirection.IN) {
+            MessageWrapper wrapper = filterFactory.createWrapper(req, resp, context);
+            processValue(timeGenerator.currentTimeMillis(), wrapper, context);
+        }
+
+        return DUNNO;
+    }
+
+
+    public static class Stage {
+
+        private final Predicate predicate;
+
+        private final boolean mustOccur;
+
+        private final long time;
+
+        private final String alias;
+
+        private String[] aliases;
+
+        Stage next;
+
+        public Stage(Predicate predicate, long time, boolean mustOccur, String alias) {
+            this.predicate = predicate;
+            this.time = time;
+            this.mustOccur = mustOccur;
+            this.alias = alias;
+
+            if (alias != null) {
+                aliases = new String[2];
+                aliases[1] = alias;
+            } else {
+                aliases = new String[1];
+            }
+        }
+
+        private void index(int index) {
+            aliases[0] = STAGE_PREFIX + index;
+        }
+
+        public Predicate getPredicate() {
+            return predicate;
+        }
+
+        public boolean isMustOccur() {
+            return mustOccur;
+        }
+
+        public long getTime() {
+            return time;
+        }
+
+        public String getAlias() {
+            return alias;
+        }
+
+        private boolean hasNext() {
+            return next != null;
+        }
+
+        private boolean match(Map<String, Object> value) {
+            return predicate.test(value);
+        }
+
+        private String[] getAliases() {
+            return aliases;
+        }
+    }
+
+    private final class StageChain {
+
+        private Stage stage;
+
+        private final Map<String, Object> valueMap;
+
+        private long expire;
+
+        public StageChain(Map<String, Object> valueMap) {
+            this.valueMap = valueMap;
+        }
+
+        public void setStage(long now, Stage stage) {
+            this.expire = now + stage.time;
+            this.stage = stage;
+        }
+
+        public Map<String, Object> mergeValue(MessageWrapper newValue) {
+            Map<String, Object> out = new HashMap<>(2);
+            out.put("req", newValue.getReq());
+            out.put("resp", newValue.getResp());
+            return out;
+        }
+
+        public void updateOldValue(MessageWrapper newValue, String[] aliases) {
+            for (String alias : aliases) {
+                valueMap.put(alias, newValue);
+            }
+        }
+
+    }
+
+    public static abstract class Action {
+
+        public abstract void execute(StageChain chain, FlowContext flowContext);
+
+    }
+
+    public static final class CreateMetricsAction extends Action {
+
+        private final Map<String, ValueExtractor> values;
+
+        private List<Metric> metrics;
+
+        public CreateMetricsAction() {
+            this(Collections.EMPTY_MAP);
+        }
+
+        public CreateMetricsAction(Map<String, ValueExtractor> values) {
+            this.values = values;
+        }
+
+        Map<String, ValueExtractor> getValues() {
+            return values;
+        }
+
+        @Override
+        public void execute(StageChain chain, FlowContext flowContext) {
+            Map<String, Serializable> result;
+
+            if (values.isEmpty()) {
+                result = Collections.emptyMap();
+            } else {
+                result = new HashMap<>();
+                Map<String, Object> persisted = chain.valueMap;
+                for (Map.Entry<String, ValueExtractor> e : values.entrySet()) {
+                    Object value = e.getValue().extract(persisted);
+                    if (value instanceof Serializable) {
+                        result.put(e.getKey(), (Serializable) value);
+                    }
+                }
+            }
+
+            MapMetric metric = new MapMetric("sequence", result);
+            synchronized (metrics) {
+                metrics.add(metric);
+            }
+        }
+    }
+
+    public static final class MultiplicationAction extends Action {
+
+        private final int multiplier;
+
+        private int stagesNum;
+
+        public MultiplicationAction(int multiplier) {
+            Assert.greaterThanZero(multiplier, "multiplier");
+            this.multiplier = multiplier;
+        }
+
+        public int getMultiplier() {
+            return multiplier;
+        }
+
+        @Override
+        public void execute(StageChain chain, FlowContext flowContext) {
+            for (int i = 0; i < multiplier; i++) {
+                for (int j = (stagesNum - 1); j >= 0; j--) {
+                    Object value = chain.valueMap.get(STAGE_PREFIX + j);
+                    if (value instanceof MessageWrapper) {
+                        MessageWrapper wrapper = (MessageWrapper) value;
+                        SessionInfo sessionInfo = flowContext.sessionInfo();
+
+                        SessionPayloadEvent<Object, Object> event;
+                        if (wrapper.req instanceof HttpFilterRequestWrapper || wrapper.resp instanceof HttpFilterResponseWrapper) {
+                            Object req = wrapper.req == null ? null : ((HttpFilterRequestWrapper) wrapper.req).getWrappedMessage();
+                            Object resp = wrapper.resp == null ? null : ((HttpFilterResponseWrapper) wrapper.resp).getWrappedMessage();
+                            event = new SessionPayloadEvent<>(sessionInfo, req, resp, sessionInfo.getProtocolId(), sessionInfo.getSourceName());
                         } else {
-                            chain.persistsValue(value, chain.seqItem.seq.getAliases());
-                            if (!chain.hasNext()) {
-                                fireEvent(chain);
-                                iterator.remove();
-                            } else if (!chain.next()) {
-                                fireEvent(chain);
-                                iterator.remove();
-                            }
+                            event = new SessionPayloadEvent<>(sessionInfo, wrapper.req, wrapper.resp, sessionInfo.getProtocolId(), sessionInfo.getSourceName());
                         }
+
+                        flowContext.queueAddFirst(event);
+                    } else {
+                        break;
                     }
                 }
             }
         }
-
-        final Map<String, Object> valueMap = new HashMap<>();
-        valueMap.put("req", value.getReq());
-        valueMap.put("resp", value.getResp());
-
-        if (seqItems[0].match(valueMap)) {
-            SeqChain chain = new SeqChain(seqItems, now, valueMap);
-            chain.persistsValue(value, seqItems[0].getAliases());
-            chains.add(chain);
-        }
-    }
-
-    private void fireEvent(SeqChain chain) {
-        Map<String, Serializable> result;
-
-        if (values.isEmpty()) {
-            result = Collections.emptyMap();
-        } else {
-            result = new HashMap<>();
-            Map<String, Object> persisted = chain.valueMap;
-            for (Map.Entry<String, ValueExtractor> e : values.entrySet()) {
-                Object value = e.getValue().extract(persisted);
-                if (value instanceof Serializable) {
-                    result.put(e.getKey(), (Serializable) value);
-                }
-            }
-        }
-
-        listener.sequenceDetected(new MapMetric("sequence", result));
     }
 
-    @Override
-    public int filterInbound(Object req, Object resp, FlowContext context) {
-        MessageWrapper wrapper = filterFactory.createWrapper(req, resp, context);
-        processValue(timeGenerator.currentTimeMillis(), wrapper);
-        return DUNNO;
-    }
-
-    @Override
-    public void reset() {
-        chains.clear();
-    }
-
-    @Override
-    public SequenceFilter instanceForWorker(int index) {
-        SequenceFilter filter = new SequenceFilter();
-        filter.listener = listener;
-        filter.seqItems = seqItems;
-        filter.values = values;
-        return filter;
-    }
-
-    @Override
-    public void configure(Configuration config, ConfigurationContext context) {
-        List<SequenceItem> itemsList = (List<SequenceItem>) config.get("sequence");
-        seqItems = itemsList.toArray(new SequenceItem[itemsList.size()]);
-        for (int i = 0; i < seqItems.length; ++i) {
-            seqItems[i].setNum(i);
-        }
-
-        values = (Map<String, ValueExtractor>) config.get("values", Collections.EMPTY_MAP);
-        ReporterDestination reporterDestination = Registry.getInstance().get(
-                ReporterDestination.SERVICE_NAME, ReporterDestination.class);
-        if (reporterDestination != null) {
-            listener = reporterDestination;
-        }
-    }
-
-    public static class NodeDefCreator implements NodeDefinitionCreator {
+    public static final class NodeDefCreator implements NodeDefinitionCreator {
 
         @Override
         public NodeDefinition create() {
-            MapNodeDefinition elementDef = mapDef(
+            MapNodeDefinition stageDef = mapDef(
                     tupleDef("match", MSG_PREDICATE_DEF),
                     tupleDef("mustOccur", BOOLEAN_DEF).setRequired(false),
                     tupleDef("time", LONG_GREATER_EQUAL_ZERO_DEF).setRequired(false),
                     tupleDef("alias", STRING_DEF).setRequired(false)
             );
-            elementDef.setTransformer(new SequencesNodeTransformer());
-            // TODO: mustOccur and time required in all steps but first
+            stageDef.setTransformer(new SequenceFilterStageNodeTransformer());
+
+            HeaderOperationNodeDefinition metricActionDef = new HeaderOperationNodeDefinition(
+                    valueDef()
+            );
+
+            KeyNameVaryListNodeDefinition actionsDef = new KeyNameVaryListNodeDefinition()
+                    .setNodeTransformer(new SequenceFilterActionNodeTransformer())
+                    .add("metric", new MappingNodeDefinition().setTransformer(new SequenceFilterValuesNodeTransformer()))
+                    .add("multiplication", INT_GREATER_THAN_ZERO_DEF);
+
 
             return mapDef(
-                    tupleDef("sequence", new ListNodeDefinition(elementDef).setMinNumberOfValues(2)),
-                    tupleDef("values", new MappingNodeDefinition().setTransformer(new ValuesNodeTransformer())).setRequired(false)
+                    tupleDef("dir", enumDef(FilterDirection.class)).setRequired(false),
+                    tupleDef("sequence", new ListNodeDefinition(stageDef).setMinNumberOfValues(2)),
+                    tupleDef("action", actionsDef)
             );
         }
 
     }
-
-    private static class SequencesNodeTransformer implements NodeTransformer<CNode> {
-
-        private static final PredicateNodeTransformer TRANSFORMER = Transformers.PREDICATE;
-
-        @Override
-        public CNode transform(CNode node, Errors errors, ConfigurationContext context) {
-            try {
-                CMapNode mapNode = (CMapNode) node;
-                List<CTupleNode> tupleNodes = mapNode.getChildren();
-
-                Predicate predicate = null;
-                boolean mustOccur = true;
-                long time = 0;
-                String alias = null;
-                for (CTupleNode tupleNode : tupleNodes) {
-                    String name = tupleNode.getName();
-                    CNode valueNode = tupleNode.getNode();
-                    switch (name) {
-                        case "match":
-                            predicate = TRANSFORMER.transform(valueNode);
-                            break;
-                        case "mustOccur":
-                            mustOccur = (Boolean) getValue(valueNode);
-                            break;
-                        case "time":
-                            time = (Long) getValue(valueNode);
-                            break;
-                        case "alias":
-                            alias = (String) getValue(valueNode);
-                            break;
-                    }
-                }
-                SequenceItem item = new SequenceItem(predicate);
-                if (time > 0) {
-                    item.setTime(time);
-                }
-                item.setMustOccur(mustOccur);
-                item.setAlias(alias);
-                return new CValueNode(item);
-            } catch (Exception ex) {
-                return node;
-            }
-        }
-
-        @Override
-        public CNode reverseTransform(CNode node, Errors errors, ConfigurationContext context) {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-
-    }
-
-    private static class ValuesNodeTransformer implements NodeTransformer<CNode> {
-
-        @Override
-        public CNode transform(CNode node, Errors errors, ConfigurationContext context) {
-            Map<String, ValueExtractor> result = new HashMap<>();
-
-            CMapNode mapNode = (CMapNode) node;
-            List<CTupleNode> tupleNodes = mapNode.getChildren();
-            for (CTupleNode tupleNode : tupleNodes) {
-                String name = tupleNode.getName();
-                CValueNode valueNode = (CValueNode) tupleNode.getNode();
-                String value = (String) valueNode.getValue();
-                try {
-                    errors.pushNestedPath(name);
-                    ValueExtractor extractor = ValueExtractorParser.DEFAULT.parse(value);
-                    result.put(name, extractor);
-                } catch (ParseException ex) {
-                    errors.reject(tupleNode, "Invalid expression: \"%s\"", value);
-                } finally {
-                    errors.popNestedPath();
-                }
-            }
-            return new CValueNode(result);
-        }
-
-        @Override
-        public CNode reverseTransform(CNode node, Errors errors, ConfigurationContext context) {
-            throw new UnsupportedOperationException("Impossible."); // ValueExtractor
-        }
-
-    }
-
-    private static Object getValue(CNode node) {
-        CValueNode valueNode = (CValueNode) node;
-        return valueNode.getValue();
-    }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/filter/SequenceFilterActionNodeTransformer.java	Tue Jun 09 12:38:40 2020 +0200
@@ -0,0 +1,96 @@
+package com.passus.st.filter;
+
+import com.passus.config.*;
+import com.passus.config.schema.NodeTransformer;
+import com.passus.config.validation.Errors;
+import com.passus.filter.ValueExtractor;
+import com.passus.filter.config.PredicateNodeTransformer;
+import com.passus.filter.config.ValueExtractorTransformer;
+import com.passus.st.filter.SequenceFilter.Action;
+
+import java.util.*;
+
+import static com.passus.config.ConfigurationUtils.extractInteger;
+import static com.passus.st.validation.NodeValidationUtils.validateType;
+
+public class SequenceFilterActionNodeTransformer implements NodeTransformer<CNode> {
+
+    private static final PredicateNodeTransformer TRANSFORMER = Transformers.PREDICATE;
+
+    @Override
+    public CValueNode transform(CNode node, Errors errors, ConfigurationContext context) {
+        CMapNode mapNode = (CMapNode) node;
+
+        List<CTupleNode> tuples = mapNode.getChildren();
+        List<Action> actions;
+        if (tuples.isEmpty()) {
+            actions = Collections.EMPTY_LIST;
+        } else {
+            actions = new ArrayList<>();
+        }
+
+        final ValueExtractorTransformer transformer = Transformers.fieldValueExtractorTransformer(context);
+        for (CTupleNode tuple : tuples) {
+            String actionName = tuple.getName();
+            try {
+                errors.pushNestedPath(actionName);
+                Action action = null;
+                switch (actionName.toLowerCase()) {
+                    case "multiplication":
+                        int multiplier = extractInteger(tuple.getNode(), errors);
+                        action = new SequenceFilter.MultiplicationAction(multiplier);
+                        break;
+                    case "metric": {
+                        if (validateType(tuple.getNode(), NodeType.MAP, errors)) {
+                            CMapNode metricNode = (CMapNode) tuple.getNode();
+                            Map<String, ValueExtractor> values = new HashMap<>(metricNode.size());
+
+
+                            for (CTupleNode mapTupleNode : metricNode.getChildren()) {
+                                try {
+                                    errors.pushNestedPath(mapTupleNode.getName());
+                                    try {
+                                        String paramName = mapTupleNode.getName();
+                                        ValueExtractor valueExtractor = transformer.transform(mapTupleNode.getNode(), errors, context);
+                                        if (errors.hasError()) {
+                                            return null;
+                                        }
+
+                                        values.put(paramName, valueExtractor);
+                                    } catch (Exception e) {
+                                        errors.reject(mapTupleNode.getNode(), "Invalid expression.");
+                                    }
+                                } finally {
+                                    errors.popNestedPath();
+                                }
+                            }
+
+                            action = new SequenceFilter.CreateMetricsAction(values);
+                        }
+
+                        break;
+                    }
+                    default:
+                        throw new IllegalStateException("Not supported action '" + actionName + "'.");
+                }
+
+                if (errors.hasError()) {
+                    return null;
+                }
+
+                if (action != null) {
+                    actions.add(action);
+                }
+            } finally {
+                errors.popNestedPath();
+            }
+        }
+
+        return new CValueNode(actions);
+    }
+
+    @Override
+    public CNode reverseTransform(CNode node, Errors errors, ConfigurationContext context) {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/filter/SequenceFilterStageNodeTransformer.java	Tue Jun 09 12:38:40 2020 +0200
@@ -0,0 +1,61 @@
+package com.passus.st.filter;
+
+import com.passus.config.*;
+import com.passus.config.schema.NodeTransformer;
+import com.passus.config.validation.Errors;
+import com.passus.filter.config.PredicateNodeTransformer;
+import com.passus.st.filter.SequenceFilter.Stage;
+
+import java.util.List;
+import java.util.function.Predicate;
+
+public class SequenceFilterStageNodeTransformer implements NodeTransformer<CNode> {
+
+    private static final PredicateNodeTransformer TRANSFORMER = Transformers.PREDICATE;
+
+    @Override
+    public CNode transform(CNode node, Errors errors, ConfigurationContext context) {
+        try {
+            CMapNode mapNode = (CMapNode) node;
+            List<CTupleNode> tupleNodes = mapNode.getChildren();
+
+            Predicate predicate = null;
+            boolean mustOccur = true;
+            long time = 0;
+            String alias = null;
+            for (CTupleNode tupleNode : tupleNodes) {
+                String name = tupleNode.getName();
+                CNode valueNode = tupleNode.getNode();
+                switch (name) {
+                    case "match":
+                        predicate = TRANSFORMER.transform(valueNode);
+                        break;
+                    case "mustOccur":
+                        mustOccur = (Boolean) getValue(valueNode);
+                        break;
+                    case "time":
+                        time = (Long) getValue(valueNode);
+                        break;
+                    case "alias":
+                        alias = (String) getValue(valueNode);
+                        break;
+                }
+            }
+
+            Stage stage = new Stage(predicate, time, mustOccur, alias);
+            return new CValueNode(stage);
+        } catch (Exception ex) {
+            return node;
+        }
+    }
+
+    @Override
+    public CNode reverseTransform(CNode node, Errors errors, ConfigurationContext context) {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    private static Object getValue(CNode node) {
+        CValueNode valueNode = (CValueNode) node;
+        return valueNode.getValue();
+    }
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/filter/SequenceFilterValuesNodeTransformer.java	Tue Jun 09 12:38:40 2020 +0200
@@ -0,0 +1,44 @@
+package com.passus.st.filter;
+
+import com.passus.config.*;
+import com.passus.config.schema.NodeTransformer;
+import com.passus.config.validation.Errors;
+import com.passus.filter.ValueExtractor;
+import com.passus.filter.ValueExtractorParser;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SequenceFilterValuesNodeTransformer implements NodeTransformer<CNode> {
+
+    @Override
+    public CNode transform(CNode node, Errors errors, ConfigurationContext context) {
+        Map<String, ValueExtractor> result = new HashMap<>();
+
+        CMapNode mapNode = (CMapNode) node;
+        List<CTupleNode> tupleNodes = mapNode.getChildren();
+        for (CTupleNode tupleNode : tupleNodes) {
+            String name = tupleNode.getName();
+            CValueNode valueNode = (CValueNode) tupleNode.getNode();
+            String value = (String) valueNode.getValue();
+            try {
+                errors.pushNestedPath(name);
+                ValueExtractor extractor = ValueExtractorParser.DEFAULT.parse(value);
+                result.put(name, extractor);
+            } catch (ParseException ex) {
+                errors.reject(tupleNode, "Invalid expression: \"%s\"", value);
+            } finally {
+                errors.popNestedPath();
+            }
+        }
+        return new CValueNode(result);
+    }
+
+    @Override
+    public CNode reverseTransform(CNode node, Errors errors, ConfigurationContext context) {
+        throw new UnsupportedOperationException("Impossible."); // ValueExtractor
+    }
+
+}
\ No newline at end of file
--- a/stress-tester/src/main/java/com/passus/st/filter/SequenceFilterX.java	Tue Jun 09 12:00:22 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,376 +0,0 @@
-package com.passus.st.filter;
-
-import com.passus.commons.Assert;
-import com.passus.commons.metric.MapMetric;
-import com.passus.commons.time.SystemTimeGenerator;
-import com.passus.commons.time.TimeAware;
-import com.passus.commons.time.TimeGenerator;
-import com.passus.config.Configuration;
-import com.passus.config.ConfigurationContext;
-import com.passus.config.schema.*;
-import com.passus.filter.ValueExtractor;
-import com.passus.st.client.FlowContext;
-import com.passus.st.config.HeaderOperationNodeDefinition;
-
-import java.io.Serializable;
-import java.util.*;
-import java.util.function.Predicate;
-
-import static com.passus.config.schema.ConfigurationSchemaBuilder.*;
-import static com.passus.st.config.CommonNodeDefs.*;
-
-public class SequenceFilterX implements FlowFilter, TimeAware {
-
-    public static final String TYPE = "sequenceX";
-
-    private static final int DEFAULT_CHAINS_NUM = 32;
-
-    private static final String STAGE_PREFIX = "_i";
-
-    private TimeGenerator timeGenerator = new SystemTimeGenerator();
-
-    private final List<StageChain> chains = new ArrayList<>(DEFAULT_CHAINS_NUM);
-
-    private final FlowFilterFactory filterFactory;
-
-    private Stage rootStage;
-
-    private FilterDirection direction = FilterDirection.IN;
-
-    private Action[] actions;
-
-    public SequenceFilterX() {
-        this(FlowFilterFactory.DEFAULT_FACTORY);
-    }
-
-    public SequenceFilterX(FlowFilterFactory filterFactory) {
-        Assert.notNull(filterFactory, "filterFactory");
-        this.filterFactory = filterFactory;
-    }
-
-    public void init(final Stage[] stages, final Action[] actions) {
-        Assert.notNull(stages, "stages");
-        Assert.notNull(actions, "actions");
-        if (stages.length < 2) {
-            throw new IllegalArgumentException("At least two Stage required.");
-        } else if (stages.length == 0) {
-            throw new IllegalArgumentException("At least one Action required.");
-        }
-
-        Stage lastStage = null;
-        Stage rootStage = null;
-        for (int i = 0; i < stages.length; i++) {
-            if (stages[i] == null) {
-                throw new NullPointerException("One Stage is null.");
-            }
-
-            Stage stage = stages[i];
-            stage.index(i);
-            if (i == 0) {
-                rootStage = stage;
-            } else if (lastStage != null) {
-                lastStage.next = stage;
-            }
-
-            lastStage = stage;
-        }
-
-        this.rootStage = rootStage;
-        this.actions = actions;
-    }
-
-    @Override
-    public void configure(Configuration config, ConfigurationContext context) {
-        List<Stage> stagesList = (List<Stage>) config.get("sequence");
-        Stage[] stagesArr = stagesList.toArray(new Stage[0]);
-
-
-        List<Action> actionsList = (List<Action>) config.get("actions");
-        Action[] actionsArr = actionsList.toArray(new Action[0]);
-
-        init(stagesArr, actionsArr);
-    }
-
-    @Override
-    public TimeGenerator getTimeGenerator() {
-        return timeGenerator;
-    }
-
-    @Override
-    public void setTimeGenerator(TimeGenerator generator) {
-        Assert.notNull(generator, "generator");
-        this.timeGenerator = generator;
-    }
-
-    @Override
-    public FlowFilter instanceForWorker(int index) {
-        throw new RuntimeException("Not implemented.");
-    }
-
-    private void fireAction(StageChain chain, FlowContext flowContext) {
-        for (Action action : actions) {
-            action.execute(chain, flowContext);
-        }
-    }
-
-    private boolean nextStage(StageChain chain, long now, MessageWrapper value, FlowContext context) {
-        if (value != null) {
-            chain.updateOldValue(value, chain.stage.getAliases());
-        }
-
-        if (chain.stage.hasNext()) {
-            chain.setStage(now, chain.stage.next);
-            return true;
-        }
-
-        fireAction(chain, context);
-        return false;
-    }
-
-    private void processValue(long now, MessageWrapper value, FlowContext context) {
-        Iterator<StageChain> it = chains.iterator();
-        if (it.hasNext()) {
-            StageChain chain = it.next();
-            for (; ; ) {
-                if (chain.expire < now) {
-                    if (chain.stage.mustOccur) {
-                        it.remove();
-                        continue;
-                    }
-
-                    if (!nextStage(chain, now, value, context)) {
-                        it.remove();
-                    } else {
-                        continue;
-                    }
-                } else if (chain.stage.mustOccur) {
-                    Map<String, Object> filterable = chain.mergeValue(value);
-                    if (chain.stage.match(filterable)) {
-                        if (!nextStage(chain, now, value, context)) {
-                            it.remove();
-                        }
-                    }
-                } else if (!nextStage(chain, now, null, context)) {
-                    it.remove();
-                }
-
-                if (!it.hasNext()) break;
-                chain = it.next();
-            }
-        }
-
-        final Map<String, Object> valueMap = new HashMap<>(2);
-        valueMap.put("req", value.getReq());
-        valueMap.put("resp", value.getResp());
-        if (rootStage.match(valueMap)) {
-            StageChain chain = new StageChain(valueMap);
-            chain.setStage(now, rootStage);
-            if (nextStage(chain, now, value, context)) {
-                chains.add(chain);
-            }
-        }
-    }
-
-    @Override
-    public int filterOutbound(Object req, Object resp, FlowContext context) {
-        if (direction == FilterDirection.BOTH || direction == FilterDirection.OUT) {
-            MessageWrapper wrapper = filterFactory.createWrapper(req, resp, context);
-            processValue(timeGenerator.currentTimeMillis(), wrapper, context);
-        }
-
-        return DUNNO;
-    }
-
-    @Override
-    public int filterInbound(Object req, Object resp, FlowContext context) {
-        if (direction == FilterDirection.BOTH || direction == FilterDirection.IN) {
-            MessageWrapper wrapper = filterFactory.createWrapper(req, resp, context);
-            processValue(timeGenerator.currentTimeMillis(), wrapper, context);
-        }
-
-        return DUNNO;
-    }
-
-
-    public static class Stage {
-
-        private final Predicate predicate;
-
-        private final boolean mustOccur;
-
-        private final long time;
-
-        private final String alias;
-
-        private String[] aliases;
-
-        private Stage next;
-
-        public Stage(Predicate predicate, long time, boolean mustOccur, String alias) {
-            this.predicate = predicate;
-            this.time = time;
-            this.mustOccur = mustOccur;
-            this.alias = alias;
-
-            if (alias != null) {
-                aliases = new String[2];
-                aliases[1] = alias;
-            } else {
-                aliases = new String[1];
-            }
-        }
-
-        private void index(int index) {
-            aliases[0] = STAGE_PREFIX + index;
-        }
-
-        public Predicate getPredicate() {
-            return predicate;
-        }
-
-        public boolean isMustOccur() {
-            return mustOccur;
-        }
-
-        public long getTime() {
-            return time;
-        }
-
-        public String getAlias() {
-            return alias;
-        }
-
-        private boolean hasNext() {
-            return next != null;
-        }
-
-        private boolean match(Map<String, Object> value) {
-            return predicate.test(value);
-        }
-
-        private String[] getAliases() {
-            return aliases;
-        }
-    }
-
-    private final class StageChain {
-
-        private Stage stage;
-
-        private final Map<String, Object> valueMap;
-
-        private long expire;
-
-        public StageChain(Map<String, Object> valueMap) {
-            this.valueMap = valueMap;
-        }
-
-        public void setStage(long now, Stage stage) {
-            this.expire = now + stage.time;
-            this.stage = stage;
-        }
-
-        public Map<String, Object> mergeValue(MessageWrapper newValue) {
-            Map<String, Object> out = new HashMap<>(2);
-            out.put("req", newValue.getReq());
-            out.put("resp", newValue.getResp());
-            return out;
-        }
-
-        public void updateOldValue(MessageWrapper newValue, String[] aliases) {
-            for (String alias : aliases) {
-                valueMap.put(alias, newValue);
-            }
-        }
-
-    }
-
-    public static abstract class Action {
-
-        public abstract void execute(StageChain chain, FlowContext flowContext);
-
-    }
-
-    public static final class CreateMetricsAction extends Action {
-
-        private final Map<String, ValueExtractor> values;
-
-        public CreateMetricsAction() {
-            this(Collections.EMPTY_MAP);
-        }
-
-        public CreateMetricsAction(Map<String, ValueExtractor> values) {
-            this.values = values;
-        }
-
-        @Override
-        public void execute(StageChain chain, FlowContext flowContext) {
-            Map<String, Serializable> result;
-
-            if (values.isEmpty()) {
-                result = Collections.emptyMap();
-            } else {
-                result = new HashMap<>();
-                Map<String, Object> persisted = chain.valueMap;
-                for (Map.Entry<String, ValueExtractor> e : values.entrySet()) {
-                    Object value = e.getValue().extract(persisted);
-                    if (value instanceof Serializable) {
-                        result.put(e.getKey(), (Serializable) value);
-                    }
-                }
-            }
-
-            MapMetric metric = new MapMetric("sequence", result);
-            throw new RuntimeException("Not implemented.");
-        }
-    }
-
-    public static final class MultiplicationAction extends Action {
-
-        private final int multiplier = 1;
-
-        public MultiplicationAction(int multiplier) {
-            Assert.greaterThanZero(multiplier, "multiplier");
-        }
-
-        public int getMultiplier() {
-            return multiplier;
-        }
-
-        @Override
-        public void execute(StageChain chain, FlowContext flowContext) {
-            throw new RuntimeException("Not implemented.");
-        }
-    }
-
-    public static class NodeDefCreator implements NodeDefinitionCreator {
-
-        @Override
-        public NodeDefinition create() {
-            MapNodeDefinition stageDef = mapDef(
-                    tupleDef("match", MSG_PREDICATE_DEF),
-                    tupleDef("mustOccur", BOOLEAN_DEF).setRequired(false),
-                    tupleDef("time", LONG_GREATER_EQUAL_ZERO_DEF).setRequired(false),
-                    tupleDef("alias", STRING_DEF).setRequired(false)
-            );
-            stageDef.setTransformer(new SequenceFilterXStageNodeTransformer());
-
-            HeaderOperationNodeDefinition metricActionDef = new HeaderOperationNodeDefinition(
-                    valueDef()
-            );
-
-            KeyNameVaryListNodeDefinition actionsDef = new KeyNameVaryListNodeDefinition()
-                    .setNodeTransformer(new SequenceFilterXActionNodeTransformer())
-                    .add("metric", metricActionDef)
-                    .add("multiplication", INT_GREATER_THAN_ZERO_DEF);
-
-
-            return mapDef(
-                    tupleDef("dir", enumDef(FilterDirection.class)).setRequired(false),
-                    tupleDef("sequence", new ListNodeDefinition(stageDef).setMinNumberOfValues(2)),
-                    tupleDef("action", actionsDef).setRequired(false)
-            );
-        }
-
-    }
-}
--- a/stress-tester/src/main/java/com/passus/st/filter/SequenceFilterXActionNodeTransformer.java	Tue Jun 09 12:00:22 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,96 +0,0 @@
-package com.passus.st.filter;
-
-import com.passus.config.*;
-import com.passus.config.schema.NodeTransformer;
-import com.passus.config.validation.Errors;
-import com.passus.filter.ValueExtractor;
-import com.passus.filter.config.PredicateNodeTransformer;
-import com.passus.filter.config.ValueExtractorTransformer;
-import com.passus.st.filter.SequenceFilterX.Action;
-
-import java.util.*;
-
-import static com.passus.config.ConfigurationUtils.extractInteger;
-import static com.passus.st.validation.NodeValidationUtils.validateType;
-
-public class SequenceFilterXActionNodeTransformer implements NodeTransformer<CNode> {
-
-    private static final PredicateNodeTransformer TRANSFORMER = Transformers.PREDICATE;
-
-    @Override
-    public CValueNode transform(CNode node, Errors errors, ConfigurationContext context) {
-        CMapNode mapNode = (CMapNode) node;
-
-        List<CTupleNode> tuples = mapNode.getChildren();
-        List<Action> actions;
-        if (tuples.isEmpty()) {
-            actions = Collections.EMPTY_LIST;
-        } else {
-            actions = new ArrayList<>();
-        }
-
-        final ValueExtractorTransformer transformer = Transformers.fieldValueExtractorTransformer(context);
-        for (CTupleNode tuple : tuples) {
-            String actionName = tuple.getName();
-            try {
-                errors.pushNestedPath(actionName);
-                Action action = null;
-                switch (actionName.toLowerCase()) {
-                    case "multiplication":
-                        int multiplier = extractInteger(tuple.getNode(), errors);
-                        action = new SequenceFilterX.MultiplicationAction(multiplier);
-                        break;
-                    case "metric": {
-                        if (validateType(tuple.getNode(), NodeType.MAP, errors)) {
-                            CMapNode metricNode = (CMapNode) tuple.getNode();
-                            Map<String, ValueExtractor> values = new HashMap<>(metricNode.size());
-
-
-                            for (CTupleNode mapTupleNode : metricNode.getChildren()) {
-                                try {
-                                    errors.pushNestedPath(mapTupleNode.getName());
-                                    try {
-                                        String paramName = mapTupleNode.getName();
-                                        ValueExtractor valueExtractor = transformer.transform(mapTupleNode.getNode(), errors, context);
-                                        if (errors.hasError()) {
-                                            return null;
-                                        }
-
-                                        values.put(paramName, valueExtractor);
-                                    } catch (Exception e) {
-                                        errors.reject(mapTupleNode.getNode(), "Invalid expression.");
-                                    }
-                                } finally {
-                                    errors.popNestedPath();
-                                }
-                            }
-
-                            action = new SequenceFilterX.CreateMetricsAction(values);
-                        }
-
-                        break;
-                    }
-                    default:
-                        throw new IllegalStateException("Not supported action '" + actionName + "'.");
-                }
-
-                if (errors.hasError()) {
-                    return null;
-                }
-
-                if (action != null) {
-                    actions.add(action);
-                }
-            } finally {
-                errors.popNestedPath();
-            }
-        }
-
-        return new CValueNode(actions);
-    }
-
-    @Override
-    public CNode reverseTransform(CNode node, Errors errors, ConfigurationContext context) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-}
\ No newline at end of file
--- a/stress-tester/src/main/java/com/passus/st/filter/SequenceFilterXStageNodeTransformer.java	Tue Jun 09 12:00:22 2020 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,61 +0,0 @@
-package com.passus.st.filter;
-
-import com.passus.config.*;
-import com.passus.config.schema.NodeTransformer;
-import com.passus.config.validation.Errors;
-import com.passus.filter.config.PredicateNodeTransformer;
-import com.passus.st.filter.SequenceFilterX.Stage;
-
-import java.util.List;
-import java.util.function.Predicate;
-
-public class SequenceFilterXStageNodeTransformer implements NodeTransformer<CNode> {
-
-    private static final PredicateNodeTransformer TRANSFORMER = Transformers.PREDICATE;
-
-    @Override
-    public CNode transform(CNode node, Errors errors, ConfigurationContext context) {
-        try {
-            CMapNode mapNode = (CMapNode) node;
-            List<CTupleNode> tupleNodes = mapNode.getChildren();
-
-            Predicate predicate = null;
-            boolean mustOccur = true;
-            long time = 0;
-            String alias = null;
-            for (CTupleNode tupleNode : tupleNodes) {
-                String name = tupleNode.getName();
-                CNode valueNode = tupleNode.getNode();
-                switch (name) {
-                    case "match":
-                        predicate = TRANSFORMER.transform(valueNode);
-                        break;
-                    case "mustOccur":
-                        mustOccur = (Boolean) getValue(valueNode);
-                        break;
-                    case "time":
-                        time = (Long) getValue(valueNode);
-                        break;
-                    case "alias":
-                        alias = (String) getValue(valueNode);
-                        break;
-                }
-            }
-
-            Stage stage = new Stage(predicate, time, mustOccur, alias);
-            return new CValueNode(stage);
-        } catch (Exception ex) {
-            return node;
-        }
-    }
-
-    @Override
-    public CNode reverseTransform(CNode node, Errors errors, ConfigurationContext context) {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    private static Object getValue(CNode node) {
-        CValueNode valueNode = (CValueNode) node;
-        return valueNode.getValue();
-    }
-}
\ No newline at end of file
--- a/stress-tester/src/test/java/com/passus/st/filter/SequenceFilterTest.java	Tue Jun 09 12:00:22 2020 +0200
+++ b/stress-tester/src/test/java/com/passus/st/filter/SequenceFilterTest.java	Tue Jun 09 12:38:40 2020 +0200
@@ -1,13 +1,8 @@
 package com.passus.st.filter;
 
-import com.passus.commons.metric.MapMetric;
+import com.passus.commons.metric.Metric;
 import com.passus.commons.utils.ResourceUtils;
 import com.passus.config.validation.Errors;
-import com.passus.filter.AndPredicate;
-import com.passus.filter.BeanValueExtractor;
-import com.passus.filter.ComparisonOperator;
-import com.passus.filter.ComparisonPredicate;
-import com.passus.filter.UnmutableValueExtractor;
 import com.passus.filter.ValueExtractor;
 import com.passus.filter.ValueExtractorParser;
 import com.passus.filter.config.PredicateNodeTransformer;
@@ -16,55 +11,85 @@
 import com.passus.net.http.HttpResponse;
 import com.passus.net.http.HttpResponseBuilder;
 import com.passus.st.AppUtils;
-import com.passus.st.filter.SequenceFilter.SequenceItem;
-
-import java.io.File;
-import java.io.Serializable;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Predicate;
-import static org.testng.AssertJUnit.*;
+import com.passus.st.Pair;
+import com.passus.st.client.Event;
+import com.passus.st.client.FlowContext;
+import com.passus.st.client.PerNameMetricsContainer;
+import com.passus.st.client.SessionPayloadEvent;
+import com.passus.st.emitter.SessionInfo;
+import com.passus.st.filter.SequenceFilter.Action;
+import com.passus.st.filter.SequenceFilter.CreateMetricsAction;
+import com.passus.st.filter.SequenceFilter.MultiplicationAction;
+import com.passus.st.filter.SequenceFilter.Stage;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-/**
- *
- * @author mikolaj.podbielski
- */
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.ParseException;
+import java.util.*;
+import java.util.function.Predicate;
+
+import static com.passus.st.utils.HttpMessageAssert.assertMessages;
+import static org.testng.AssertJUnit.assertEquals;
+
 public class SequenceFilterTest {
 
     private static final PredicateNodeTransformer TRANSFORMER = Transformers.PREDICATE;
 
-    @BeforeClass
-    public static void beforeClass() {
-        AppUtils.registerAll();
-    }
+    private List<Pair<HttpRequest, HttpResponse>> pairs = new ArrayList<>();
 
-    @AfterClass
-    public static void afterClass() {
-        AppUtils.unregisterAll();
-    }
-
-    @Test
-    public void testFilterInbound() throws Exception {
+    @BeforeClass
+    public void beforeClass() {
+        AppUtils.registerAll();
         HttpRequest req0 = HttpRequestBuilder.get("http://example.com/res1").build();
         HttpResponse resp0 = HttpResponseBuilder.ok().cookie("id", "123").build();
+        pairs.add(new Pair<>(req0, resp0));
 
         HttpRequest req1 = HttpRequestBuilder.get("http://example.com/res2")
                 .cookie("id", "123").header("Xyz", "abc").build();
         HttpResponse resp1 = HttpResponseBuilder.ok().build();
-
-        TestSequenceListener listener = new TestSequenceListener();
+        pairs.add(new Pair<>(req1, resp1));
+    }
 
-        SequenceItem[] seqItems = {
-            item("{\"@req.uri\": \"/res1\", \"@resp.getCookie('id')\": {$neq: \"0\"}}", 0),
-            item("{\"@req.getHeader('xyz')\": \"abc\", \"@req.getCookie('id')\": \"@_i0.resp.getCookie('id')\"}", 1, "last"),};
+    @AfterClass
+    public void afterClass() {
+        AppUtils.unregisterAll();
+    }
+
+    private static Stage stage(String predicateString) throws Exception {
+        return stage(predicateString, null);
+    }
+
+    private static Stage stage(String predicateString, String alias) throws Exception {
+        return stage(predicateString, 60_000, true, alias);
+    }
+
+    private static Stage stage(String predicateString, long time, boolean mustOccur, String alias) throws Exception {
+        Predicate predicate = TRANSFORMER.transform(predicateString);
+        return new Stage(predicate, time, mustOccur, alias);
+    }
+
+    private static Action metricsAction(Map<String, ValueExtractor> values) {
+        return new CreateMetricsAction(values);
+    }
+
+    private static Action multiAction(int multiplier) {
+        return new MultiplicationAction(multiplier);
+    }
+
+    private static ValueExtractor value(String s) throws ParseException {
+        return ValueExtractorParser.DEFAULT.parse(s);
+    }
+
+    @Test
+    public void testSequenceMetricAction() throws Exception {
+        Stage[] stages = {
+                stage("{\"@req.uri\": \"/res1\"}"),
+                stage("{\"@req.uri\": \"/res2\"}", "last")
+        };
 
         Map<String, ValueExtractor> values = new HashMap<>();
         values.put("code", value("@_i0.resp.status.code"));
@@ -72,38 +97,69 @@
         values.put("header", value("@_i1.req.getHeader('xyz')"));
         values.put("cookie", value("@last.req.getCookie('id')"));
         values.put("seqName", value("example sequence"));
-
-        SequenceFilter filter = new SequenceFilter().instanceForWorker(0);
-        filter.setListener(listener);
-        filter.setSeqItems(seqItems);
-        filter.setValues(values);
+        Action[] actions = {
+                metricsAction(values)
+        };
 
-        filter.filterInbound(req0, resp0, null);
-        filter.filterInbound(req1, resp1, null);
+        SequenceFilter filter = new SequenceFilter();
+        filter.init(stages, actions);
 
-        assertEquals(1, listener.events.size());
-        Map<String, Serializable> extracted = listener.events.get(0).getAttributesValue();
-        assertEquals(200, extracted.get("code"));
-        assertEquals("example.com", extracted.get("xhost").toString());
-        assertEquals("abc", extracted.get("header").toString());
-        assertEquals("123", extracted.get("cookie").toString());
-        assertEquals("example sequence", extracted.get("seqName").toString());
+        pairs.forEach((p) -> {
+            filter.filterInbound(p.getValue1(), p.getValue2(), null);
+        });
+
+        PerNameMetricsContainer metricsContainer = new PerNameMetricsContainer();
+        filter.writeMetrics(metricsContainer);
+
+        Collection<Metric> metrics = metricsContainer.getMetrics();
+        assertEquals(1, metrics.size());
+
+        Metric metric = metrics.iterator().next();
+        assertEquals(200, metric.getAttributeValue("code"));
+        assertEquals("example.com", metric.getAttributeValue("xhost").toString());
+        assertEquals("abc", metric.getAttributeValue("header").toString());
+        assertEquals("123", metric.getAttributeValue("cookie").toString());
+        assertEquals("example sequence", metric.getAttributeValue("seqName"));
     }
 
-    private static SequenceItem item(String predicateString, int num) throws Exception {
-        Predicate predicate = TRANSFORMER.transform(predicateString);
-        return new SequenceItem(predicate, num);
-    }
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testSequenceMultiplicationAction() throws Exception {
+        Stage[] stages = {
+                stage("{\"@req.uri\": \"/res1\"}"),
+                stage("{\"@req.uri\": \"/res2\"}", "last")
+        };
 
-    private static SequenceItem item(String predicateString, int num, String alias) throws Exception {
-        Predicate predicate = TRANSFORMER.transform(predicateString);
-        SequenceItem si = new SequenceItem(predicate, num);
-        si.setAlias(alias);
-        return si;
-    }
+        int multiplier = 3;
+        Action[] actions = {
+                multiAction(multiplier)
+        };
 
-    private static ValueExtractor value(String s) throws ParseException {
-        return ValueExtractorParser.DEFAULT.parse(s);
+        SequenceFilter filter = new SequenceFilter();
+        filter.setDirection(FilterDirection.OUT);
+        filter.init(stages, actions);
+
+        SessionInfo sessionInfo = new SessionInfo("1.1.1.1", 11000, "1.1.1.2", 80);
+        Deque<Event> queue = new LinkedList<>();
+        FlowContext context = new FlowContext(sessionInfo, queue);
+        pairs.forEach((p) -> {
+            filter.filterOutbound(p.getValue1(), p.getValue2(), context);
+        });
+
+        assertEquals(multiplier * stages.length, queue.size());
+        Iterator<Event> it = queue.iterator();
+        Pair<HttpRequest, HttpResponse> pair1 = pairs.get(0);
+        Pair<HttpRequest, HttpResponse> pair2 = pairs.get(1);
+        for (int i = 0; i < multiplier; i++) {
+            SessionPayloadEvent<HttpRequest, HttpResponse> event = (SessionPayloadEvent<HttpRequest, HttpResponse>) it.next();
+
+            assertMessages(pair1.getValue1(), event.getRequest());
+            assertMessages(pair1.getValue2(), event.getResponse());
+
+            event = (SessionPayloadEvent<HttpRequest, HttpResponse>) it.next();
+            assertMessages(pair2.getValue1(), event.getRequest());
+            assertMessages(pair2.getValue2(), event.getResponse());
+        }
     }
 
     @Test
@@ -113,53 +169,32 @@
 
         Errors errors = new Errors();
         List<FlowFilter> filters = FlowFiltersConfigurator.getFilters(filterConfig, errors, null);
-        FilterTestUtils.printErrors(errors);
 
         assertEquals(0, errors.getErrorCount());
         assertEquals(1, filters.size());
-        assertTrue(filters.get(0) instanceof SequenceFilter);
-        SequenceFilter filter = (SequenceFilter) filters.get(0);
-
-        SequenceItem[] seqItems = filter.getSeqItems();
-        assertEquals(2, seqItems.length);
-        assertSeqItemValue(seqItems[0], 0, 10000, null);
-        assertSeqItemValue(seqItems[1], 1, 20000, "last");
-
-        AndPredicate p0 = (AndPredicate) seqItems[0].getPredicate();
-        assertEquals(2, p0.getSubPredicates().size());
-        ComparisonPredicate sp1 = (ComparisonPredicate) p0.getSubPredicates().get(1);
-        assertEquals(ComparisonOperator.NOT_EQUAL, sp1.getOperator());
-
-        assertEquals("resp.getCookie('id')", sp1.getFieldName());
-
-        UnmutableValueExtractor uve = (UnmutableValueExtractor) sp1.getPattern();
-        assertEquals(123, uve.extract(null));
 
-        Map<String, ValueExtractor> values = filter.getValues();
-        assertEquals(4, values.size());
-        ValueExtractor extractor = values.get("header");
-        assertTrue(extractor instanceof BeanValueExtractor);
-        BeanValueExtractor bve = (BeanValueExtractor) extractor;
-        assertEquals("last.req.getHeader('xyz')", bve.getFieldName());
-    }
+        SequenceFilter filter = (SequenceFilter) filters.get(0);
+        Action[] actions = filter.getActions();
 
-    private static void assertSeqItemValue(SequenceItem item, int num, long time, String alias) {
-        assertEquals("num", num, item.getNum());
-        assertEquals("time", time, item.getTime());
-        assertEquals("alias", alias, item.getAlias());
-    }
+        assertEquals(2, actions.length);
+        CreateMetricsAction cmAction = (CreateMetricsAction) actions[0];
 
-    public static class TestSequenceListener implements SequenceListener {
+        assertEquals(4, cmAction.getValues().size());
 
-        ArrayList<MapMetric> events = new ArrayList<>();
+        int stagesNum = 0;
+        Stage stage = filter.getRootStage();
+        while (stage != null) {
+            stagesNum++;
+            if (stagesNum == 1) {
+                assertEquals(null, stage.getAlias());
+            } else if (stagesNum == 2) {
+                assertEquals(20000, stage.getTime());
+                assertEquals("last", stage.getAlias());
+            }
 
-        @Override
-        public void sequenceDetected(MapMetric sequence) {
-            events.add(sequence);
+            stage = stage.next;
         }
 
-        public void reset() {
-            events.clear();
-        }
+        assertEquals(2, stagesNum);
     }
-}
+}
\ No newline at end of file
--- a/stress-tester/src/test/resources/com/passus/st/client/http/filter/sequence.yml	Tue Jun 09 12:00:22 2020 +0200
+++ b/stress-tester/src/test/resources/com/passus/st/client/http/filter/sequence.yml	Tue Jun 09 12:38:40 2020 +0200
@@ -6,8 +6,10 @@
         mustOccur: true
         time: 20000
         alias: last
-    values:
+    action:
+      metric:
         code: '@_i0.resp.status.code'
         xhost: '@_i0.req.url.host'
         header: "@last.req.getHeader('xyz')"
         cookie: "@_i1.req.getCookie('id')"
+      multiplication: 5