changeset 1130:f4b74fec7584

SequenceFilterX - in progress
author Devel 2
date Mon, 08 Jun 2020 11:55:22 +0200
parents c34359011cf5
children 0759f67a8f79
files stress-tester/src/main/java/com/passus/st/filter/ stress-tester/src/main/java/com/passus/st/filter/ stress-tester/src/main/java/com/passus/st/filter/
diffstat 3 files changed, 533 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/filter/	Mon Jun 08 11:55:22 2020 +0200
@@ -0,0 +1,376 @@
+import com.passus.commons.Assert;
+import com.passus.commons.metric.MapMetric;
+import com.passus.commons.time.SystemTimeGenerator;
+import com.passus.commons.time.TimeAware;
+import com.passus.commons.time.TimeGenerator;
+import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
+import com.passus.config.schema.*;
+import com.passus.filter.ValueExtractor;
+import java.util.*;
+import java.util.function.Predicate;
+import static com.passus.config.schema.ConfigurationSchemaBuilder.*;
+import static*;
+public class SequenceFilterX implements FlowFilter, TimeAware {
+    public static final String TYPE = "sequenceX";
+    private static final int DEFAULT_CHAINS_NUM = 32;
+    private static final String STAGE_PREFIX = "_i";
+    private TimeGenerator timeGenerator = new SystemTimeGenerator();
+    private final List<StageChain> chains = new ArrayList<>(DEFAULT_CHAINS_NUM);
+    private final FlowFilterFactory filterFactory;
+    private Stage rootStage;
+    private FilterDirection direction = FilterDirection.IN;
+    private Action[] actions;
+    public SequenceFilterX() {
+        this(FlowFilterFactory.DEFAULT_FACTORY);
+    }
+    public SequenceFilterX(FlowFilterFactory filterFactory) {
+        Assert.notNull(filterFactory, "filterFactory");
+        this.filterFactory = filterFactory;
+    }
+    public void init(final Stage[] stages, final Action[] actions) {
+        Assert.notNull(stages, "stages");
+        Assert.notNull(actions, "actions");
+        if (stages.length < 2) {
+            throw new IllegalArgumentException("At least two Stage required.");
+        } else if (stages.length == 0) {
+            throw new IllegalArgumentException("At least one Action required.");
+        }
+        Stage lastStage = null;
+        Stage rootStage = null;
+        for (int i = 0; i < stages.length; i++) {
+            if (stages[i] == null) {
+                throw new NullPointerException("One Stage is null.");
+            }
+            Stage stage = stages[i];
+            stage.index(i);
+            if (i == 0) {
+                rootStage = stage;
+            } else if (lastStage != null) {
+       = stage;
+            }
+            lastStage = stage;
+        }
+        this.rootStage = rootStage;
+        this.actions = actions;
+    }
+    @Override
+    public void configure(Configuration config, ConfigurationContext context) {
+        List<Stage> stagesList = (List<Stage>) config.get("sequence");
+        Stage[] stagesArr = stagesList.toArray(new Stage[0]);
+        List<Action> actionsList = (List<Action>) config.get("actions");
+        Action[] actionsArr = actionsList.toArray(new Action[0]);
+        init(stagesArr, actionsArr);
+    }
+    @Override
+    public TimeGenerator getTimeGenerator() {
+        return timeGenerator;
+    }
+    @Override
+    public void setTimeGenerator(TimeGenerator generator) {
+        Assert.notNull(generator, "generator");
+        this.timeGenerator = generator;
+    }
+    @Override
+    public FlowFilter instanceForWorker(int index) {
+        throw new RuntimeException("Not implemented.");
+    }
+    private void fireAction(StageChain chain, FlowContext flowContext) {
+        for (Action action : actions) {
+            action.execute(chain, flowContext);
+        }
+    }
+    private boolean nextStage(StageChain chain, long now, MessageWrapper value, FlowContext context) {
+        if (value != null) {
+            chain.updateOldValue(value, chain.stage.getAliases());
+        }
+        if (chain.stage.hasNext()) {
+            chain.setStage(now,;
+            return true;
+        }
+        fireAction(chain, context);
+        return false;
+    }
+    private void processValue(long now, MessageWrapper value, FlowContext context) {
+        Iterator<StageChain> it = chains.iterator();
+        if (it.hasNext()) {
+            StageChain chain =;
+            for (; ; ) {
+                if (chain.expire < now) {
+                    if (chain.stage.mustOccur) {
+                        it.remove();
+                        continue;
+                    }
+                    if (!nextStage(chain, now, value, context)) {
+                        it.remove();
+                    } else {
+                        continue;
+                    }
+                } else if (chain.stage.mustOccur) {
+                    Map<String, Object> filterable = chain.mergeValue(value);
+                    if (chain.stage.match(filterable)) {
+                        if (!nextStage(chain, now, value, context)) {
+                            it.remove();
+                        }
+                    }
+                } else if (!nextStage(chain, now, null, context)) {
+                    it.remove();
+                }
+                if (!it.hasNext()) break;
+                chain =;
+            }
+        }
+        final Map<String, Object> valueMap = new HashMap<>(2);
+        valueMap.put("req", value.getReq());
+        valueMap.put("resp", value.getResp());
+        if (rootStage.match(valueMap)) {
+            StageChain chain = new StageChain(valueMap);
+            chain.setStage(now, rootStage);
+            if (nextStage(chain, now, value, context)) {
+                chains.add(chain);
+            }
+        }
+    }
+    @Override
+    public int filterOutbound(Object req, Object resp, FlowContext context) {
+        if (direction == FilterDirection.BOTH || direction == FilterDirection.OUT) {
+            MessageWrapper wrapper = filterFactory.createWrapper(req, resp, context);
+            processValue(timeGenerator.currentTimeMillis(), wrapper, context);
+        }
+        return DUNNO;
+    }
+    @Override
+    public int filterInbound(Object req, Object resp, FlowContext context) {
+        if (direction == FilterDirection.BOTH || direction == FilterDirection.IN) {
+            MessageWrapper wrapper = filterFactory.createWrapper(req, resp, context);
+            processValue(timeGenerator.currentTimeMillis(), wrapper, context);
+        }
+        return DUNNO;
+    }
+    public static class Stage {
+        private final Predicate predicate;
+        private final boolean mustOccur;
+        private final long time;
+        private final String alias;
+        private String[] aliases;
+        private Stage next;
+        public Stage(Predicate predicate, long time, boolean mustOccur, String alias) {
+            this.predicate = predicate;
+            this.time = time;
+            this.mustOccur = mustOccur;
+            this.alias = alias;
+            if (alias != null) {
+                aliases = new String[2];
+                aliases[1] = alias;
+            } else {
+                aliases = new String[1];
+            }
+        }
+        private void index(int index) {
+            aliases[0] = STAGE_PREFIX + index;
+        }
+        public Predicate getPredicate() {
+            return predicate;
+        }
+        public boolean isMustOccur() {
+            return mustOccur;
+        }
+        public long getTime() {
+            return time;
+        }
+        public String getAlias() {
+            return alias;
+        }
+        private boolean hasNext() {
+            return next != null;
+        }
+        private boolean match(Map<String, Object> value) {
+            return predicate.test(value);
+        }
+        private String[] getAliases() {
+            return aliases;
+        }
+    }
+    private final class StageChain {
+        private Stage stage;
+        private final Map<String, Object> valueMap;
+        private long expire;
+        public StageChain(Map<String, Object> valueMap) {
+            this.valueMap = valueMap;
+        }
+        public void setStage(long now, Stage stage) {
+            this.expire = now + stage.time;
+            this.stage = stage;
+        }
+        public Map<String, Object> mergeValue(MessageWrapper newValue) {
+            Map<String, Object> out = new HashMap<>(2);
+            out.put("req", newValue.getReq());
+            out.put("resp", newValue.getResp());
+            return out;
+        }
+        public void updateOldValue(MessageWrapper newValue, String[] aliases) {
+            for (String alias : aliases) {
+                valueMap.put(alias, newValue);
+            }
+        }
+    }
+    public static abstract class Action {
+        public abstract void execute(StageChain chain, FlowContext flowContext);
+    }
+    public static final class CreateMetricsAction extends Action {
+        private final Map<String, ValueExtractor> values;
+        public CreateMetricsAction() {
+            this(Collections.EMPTY_MAP);
+        }
+        public CreateMetricsAction(Map<String, ValueExtractor> values) {
+            this.values = values;
+        }
+        @Override
+        public void execute(StageChain chain, FlowContext flowContext) {
+            Map<String, Serializable> result;
+            if (values.isEmpty()) {
+                result = Collections.emptyMap();
+            } else {
+                result = new HashMap<>();
+                Map<String, Object> persisted = chain.valueMap;
+                for (Map.Entry<String, ValueExtractor> e : values.entrySet()) {
+                    Object value = e.getValue().extract(persisted);
+                    if (value instanceof Serializable) {
+                        result.put(e.getKey(), (Serializable) value);
+                    }
+                }
+            }
+            MapMetric metric = new MapMetric("sequence", result);
+            throw new RuntimeException("Not implemented.");
+        }
+    }
+    public static final class MultiplicationAction extends Action {
+        private final int multiplier = 1;
+        public MultiplicationAction(int multiplier) {
+            Assert.greaterThanZero(multiplier, "multiplier");
+        }
+        public int getMultiplier() {
+            return multiplier;
+        }
+        @Override
+        public void execute(StageChain chain, FlowContext flowContext) {
+            throw new RuntimeException("Not implemented.");
+        }
+    }
+    public static class NodeDefCreator implements NodeDefinitionCreator {
+        @Override
+        public NodeDefinition create() {
+            MapNodeDefinition stageDef = mapDef(
+                    tupleDef("match", MSG_PREDICATE_DEF),
+                    tupleDef("mustOccur", BOOLEAN_DEF).setRequired(false),
+                    tupleDef("time", LONG_GREATER_EQUAL_ZERO_DEF).setRequired(false),
+                    tupleDef("alias", STRING_DEF).setRequired(false)
+            );
+            stageDef.setTransformer(new SequenceFilterXStageNodeTransformer());
+            HeaderOperationNodeDefinition metricActionDef = new HeaderOperationNodeDefinition(
+                    valueDef()
+            );
+            KeyNameVaryListNodeDefinition actionsDef = new KeyNameVaryListNodeDefinition()
+                    .setNodeTransformer(new SequenceFilterXActionNodeTransformer())
+                    .add("metric", metricActionDef)
+                    .add("multiplication", INT_GREATER_THAN_ZERO_DEF);
+            return mapDef(
+                    tupleDef("dir", enumDef(FilterDirection.class)).setRequired(false),
+                    tupleDef("sequence", new ListNodeDefinition(stageDef).setMinNumberOfValues(2)),
+                    tupleDef("action", actionsDef).setRequired(false)
+            );
+        }
+    }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/filter/	Mon Jun 08 11:55:22 2020 +0200
@@ -0,0 +1,96 @@
+import com.passus.config.*;
+import com.passus.config.schema.NodeTransformer;
+import com.passus.config.validation.Errors;
+import com.passus.filter.ValueExtractor;
+import com.passus.filter.config.PredicateNodeTransformer;
+import com.passus.filter.config.ValueExtractorTransformer;
+import java.util.*;
+import static com.passus.config.ConfigurationUtils.extractInteger;
+import static;
+public class SequenceFilterXActionNodeTransformer implements NodeTransformer<CNode> {
+    private static final PredicateNodeTransformer TRANSFORMER = Transformers.PREDICATE;
+    @Override
+    public CValueNode transform(CNode node, Errors errors, ConfigurationContext context) {
+        CMapNode mapNode = (CMapNode) node;
+        List<CTupleNode> tuples = mapNode.getChildren();
+        List<Action> actions;
+        if (tuples.isEmpty()) {
+            actions = Collections.EMPTY_LIST;
+        } else {
+            actions = new ArrayList<>();
+        }
+        final ValueExtractorTransformer transformer = Transformers.fieldValueExtractorTransformer(context);
+        for (CTupleNode tuple : tuples) {
+            String actionName = tuple.getName();
+            try {
+                errors.pushNestedPath(actionName);
+                Action action = null;
+                switch (actionName.toLowerCase()) {
+                    case "multiplication":
+                        int multiplier = extractInteger(tuple.getNode(), errors);
+                        action = new SequenceFilterX.MultiplicationAction(multiplier);
+                        break;
+                    case "metric": {
+                        if (validateType(tuple.getNode(), NodeType.MAP, errors)) {
+                            CMapNode metricNode = (CMapNode) tuple.getNode();
+                            Map<String, ValueExtractor> values = new HashMap<>(metricNode.size());
+                            for (CTupleNode mapTupleNode : metricNode.getChildren()) {
+                                try {
+                                    errors.pushNestedPath(mapTupleNode.getName());
+                                    try {
+                                        String paramName = mapTupleNode.getName();
+                                        ValueExtractor valueExtractor = transformer.transform(mapTupleNode.getNode(), errors, context);
+                                        if (errors.hasError()) {
+                                            return null;
+                                        }
+                                        values.put(paramName, valueExtractor);
+                                    } catch (Exception e) {
+                                        errors.reject(mapTupleNode.getNode(), "Invalid expression.");
+                                    }
+                                } finally {
+                                    errors.popNestedPath();
+                                }
+                            }
+                            action = new SequenceFilterX.CreateMetricsAction(values);
+                        }
+                        break;
+                    }
+                    default:
+                        throw new IllegalStateException("Not supported action '" + actionName + "'.");
+                }
+                if (errors.hasError()) {
+                    return null;
+                }
+                if (action != null) {
+                    actions.add(action);
+                }
+            } finally {
+                errors.popNestedPath();
+            }
+        }
+        return new CValueNode(actions);
+    }
+    @Override
+    public CNode reverseTransform(CNode node, Errors errors, ConfigurationContext context) {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/filter/	Mon Jun 08 11:55:22 2020 +0200
@@ -0,0 +1,61 @@
+import com.passus.config.*;
+import com.passus.config.schema.NodeTransformer;
+import com.passus.config.validation.Errors;
+import com.passus.filter.config.PredicateNodeTransformer;
+import java.util.List;
+import java.util.function.Predicate;
+public class SequenceFilterXStageNodeTransformer implements NodeTransformer<CNode> {
+    private static final PredicateNodeTransformer TRANSFORMER = Transformers.PREDICATE;
+    @Override
+    public CNode transform(CNode node, Errors errors, ConfigurationContext context) {
+        try {
+            CMapNode mapNode = (CMapNode) node;
+            List<CTupleNode> tupleNodes = mapNode.getChildren();
+            Predicate predicate = null;
+            boolean mustOccur = true;
+            long time = 0;
+            String alias = null;
+            for (CTupleNode tupleNode : tupleNodes) {
+                String name = tupleNode.getName();
+                CNode valueNode = tupleNode.getNode();
+                switch (name) {
+                    case "match":
+                        predicate = TRANSFORMER.transform(valueNode);
+                        break;
+                    case "mustOccur":
+                        mustOccur = (Boolean) getValue(valueNode);
+                        break;
+                    case "time":
+                        time = (Long) getValue(valueNode);
+                        break;
+                    case "alias":
+                        alias = (String) getValue(valueNode);
+                        break;
+                }
+            }
+            Stage stage = new Stage(predicate, time, mustOccur, alias);
+            return new CValueNode(stage);
+        } catch (Exception ex) {
+            return node;
+        }
+    }
+    @Override
+    public CNode reverseTransform(CNode node, Errors errors, ConfigurationContext context) {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+    private static Object getValue(CNode node) {
+        CValueNode valueNode = (CValueNode) node;
+        return valueNode.getValue();
+    }
\ No newline at end of file