Mercurial > stress-tester
changeset 1130:f4b74fec7584
SequenceFilterX - in progress
author | Devel 2 |
---|---|
date | Mon, 08 Jun 2020 11:55:22 +0200 |
parents | c34359011cf5 |
children | 0759f67a8f79 |
files | stress-tester/src/main/java/com/passus/st/filter/SequenceFilterX.java stress-tester/src/main/java/com/passus/st/filter/SequenceFilterXActionNodeTransformer.java stress-tester/src/main/java/com/passus/st/filter/SequenceFilterXStageNodeTransformer.java |
diffstat | 3 files changed, 533 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/filter/SequenceFilterX.java Mon Jun 08 11:55:22 2020 +0200 @@ -0,0 +1,376 @@ +package com.passus.st.filter; + +import com.passus.commons.Assert; +import com.passus.commons.metric.MapMetric; +import com.passus.commons.time.SystemTimeGenerator; +import com.passus.commons.time.TimeAware; +import com.passus.commons.time.TimeGenerator; +import com.passus.config.Configuration; +import com.passus.config.ConfigurationContext; +import com.passus.config.schema.*; +import com.passus.filter.ValueExtractor; +import com.passus.st.client.FlowContext; +import com.passus.st.config.HeaderOperationNodeDefinition; + +import java.io.Serializable; +import java.util.*; +import java.util.function.Predicate; + +import static com.passus.config.schema.ConfigurationSchemaBuilder.*; +import static com.passus.st.config.CommonNodeDefs.*; + +public class SequenceFilterX implements FlowFilter, TimeAware { + + public static final String TYPE = "sequenceX"; + + private static final int DEFAULT_CHAINS_NUM = 32; + + private static final String STAGE_PREFIX = "_i"; + + private TimeGenerator timeGenerator = new SystemTimeGenerator(); + + private final List<StageChain> chains = new ArrayList<>(DEFAULT_CHAINS_NUM); + + private final FlowFilterFactory filterFactory; + + private Stage rootStage; + + private FilterDirection direction = FilterDirection.IN; + + private Action[] actions; + + public SequenceFilterX() { + this(FlowFilterFactory.DEFAULT_FACTORY); + } + + public SequenceFilterX(FlowFilterFactory filterFactory) { + Assert.notNull(filterFactory, "filterFactory"); + this.filterFactory = filterFactory; + } + + public void init(final Stage[] stages, final Action[] actions) { + Assert.notNull(stages, "stages"); + Assert.notNull(actions, "actions"); + if (stages.length < 2) { + throw new IllegalArgumentException("At least two Stage required."); + } else if (stages.length == 0) { + throw new IllegalArgumentException("At least one Action required."); + } + + Stage lastStage = null; + Stage rootStage = null; + for (int i = 0; i < stages.length; i++) { + if (stages[i] == null) { + throw new NullPointerException("One Stage is null."); + } + + Stage stage = stages[i]; + stage.index(i); + if (i == 0) { + rootStage = stage; + } else if (lastStage != null) { + lastStage.next = stage; + } + + lastStage = stage; + } + + this.rootStage = rootStage; + this.actions = actions; + } + + @Override + public void configure(Configuration config, ConfigurationContext context) { + List<Stage> stagesList = (List<Stage>) config.get("sequence"); + Stage[] stagesArr = stagesList.toArray(new Stage[0]); + + + List<Action> actionsList = (List<Action>) config.get("actions"); + Action[] actionsArr = actionsList.toArray(new Action[0]); + + init(stagesArr, actionsArr); + } + + @Override + public TimeGenerator getTimeGenerator() { + return timeGenerator; + } + + @Override + public void setTimeGenerator(TimeGenerator generator) { + Assert.notNull(generator, "generator"); + this.timeGenerator = generator; + } + + @Override + public FlowFilter instanceForWorker(int index) { + throw new RuntimeException("Not implemented."); + } + + private void fireAction(StageChain chain, FlowContext flowContext) { + for (Action action : actions) { + action.execute(chain, flowContext); + } + } + + private boolean nextStage(StageChain chain, long now, MessageWrapper value, FlowContext context) { + if (value != null) { + chain.updateOldValue(value, chain.stage.getAliases()); + } + + if (chain.stage.hasNext()) { + chain.setStage(now, chain.stage.next); + return true; + } + + fireAction(chain, context); + return false; + } + + private void processValue(long now, MessageWrapper value, FlowContext context) { + Iterator<StageChain> it = chains.iterator(); + if (it.hasNext()) { + StageChain chain = it.next(); + for (; ; ) { + if (chain.expire < now) { + if (chain.stage.mustOccur) { + it.remove(); + continue; + } + + if (!nextStage(chain, now, value, context)) { + it.remove(); + } else { + continue; + } + } else if (chain.stage.mustOccur) { + Map<String, Object> filterable = chain.mergeValue(value); + if (chain.stage.match(filterable)) { + if (!nextStage(chain, now, value, context)) { + it.remove(); + } + } + } else if (!nextStage(chain, now, null, context)) { + it.remove(); + } + + if (!it.hasNext()) break; + chain = it.next(); + } + } + + final Map<String, Object> valueMap = new HashMap<>(2); + valueMap.put("req", value.getReq()); + valueMap.put("resp", value.getResp()); + if (rootStage.match(valueMap)) { + StageChain chain = new StageChain(valueMap); + chain.setStage(now, rootStage); + if (nextStage(chain, now, value, context)) { + chains.add(chain); + } + } + } + + @Override + public int filterOutbound(Object req, Object resp, FlowContext context) { + if (direction == FilterDirection.BOTH || direction == FilterDirection.OUT) { + MessageWrapper wrapper = filterFactory.createWrapper(req, resp, context); + processValue(timeGenerator.currentTimeMillis(), wrapper, context); + } + + return DUNNO; + } + + @Override + public int filterInbound(Object req, Object resp, FlowContext context) { + if (direction == FilterDirection.BOTH || direction == FilterDirection.IN) { + MessageWrapper wrapper = filterFactory.createWrapper(req, resp, context); + processValue(timeGenerator.currentTimeMillis(), wrapper, context); + } + + return DUNNO; + } + + + public static class Stage { + + private final Predicate predicate; + + private final boolean mustOccur; + + private final long time; + + private final String alias; + + private String[] aliases; + + private Stage next; + + public Stage(Predicate predicate, long time, boolean mustOccur, String alias) { + this.predicate = predicate; + this.time = time; + this.mustOccur = mustOccur; + this.alias = alias; + + if (alias != null) { + aliases = new String[2]; + aliases[1] = alias; + } else { + aliases = new String[1]; + } + } + + private void index(int index) { + aliases[0] = STAGE_PREFIX + index; + } + + public Predicate getPredicate() { + return predicate; + } + + public boolean isMustOccur() { + return mustOccur; + } + + public long getTime() { + return time; + } + + public String getAlias() { + return alias; + } + + private boolean hasNext() { + return next != null; + } + + private boolean match(Map<String, Object> value) { + return predicate.test(value); + } + + private String[] getAliases() { + return aliases; + } + } + + private final class StageChain { + + private Stage stage; + + private final Map<String, Object> valueMap; + + private long expire; + + public StageChain(Map<String, Object> valueMap) { + this.valueMap = valueMap; + } + + public void setStage(long now, Stage stage) { + this.expire = now + stage.time; + this.stage = stage; + } + + public Map<String, Object> mergeValue(MessageWrapper newValue) { + Map<String, Object> out = new HashMap<>(2); + out.put("req", newValue.getReq()); + out.put("resp", newValue.getResp()); + return out; + } + + public void updateOldValue(MessageWrapper newValue, String[] aliases) { + for (String alias : aliases) { + valueMap.put(alias, newValue); + } + } + + } + + public static abstract class Action { + + public abstract void execute(StageChain chain, FlowContext flowContext); + + } + + public static final class CreateMetricsAction extends Action { + + private final Map<String, ValueExtractor> values; + + public CreateMetricsAction() { + this(Collections.EMPTY_MAP); + } + + public CreateMetricsAction(Map<String, ValueExtractor> values) { + this.values = values; + } + + @Override + public void execute(StageChain chain, FlowContext flowContext) { + Map<String, Serializable> result; + + if (values.isEmpty()) { + result = Collections.emptyMap(); + } else { + result = new HashMap<>(); + Map<String, Object> persisted = chain.valueMap; + for (Map.Entry<String, ValueExtractor> e : values.entrySet()) { + Object value = e.getValue().extract(persisted); + if (value instanceof Serializable) { + result.put(e.getKey(), (Serializable) value); + } + } + } + + MapMetric metric = new MapMetric("sequence", result); + throw new RuntimeException("Not implemented."); + } + } + + public static final class MultiplicationAction extends Action { + + private final int multiplier = 1; + + public MultiplicationAction(int multiplier) { + Assert.greaterThanZero(multiplier, "multiplier"); + } + + public int getMultiplier() { + return multiplier; + } + + @Override + public void execute(StageChain chain, FlowContext flowContext) { + throw new RuntimeException("Not implemented."); + } + } + + public static class NodeDefCreator implements NodeDefinitionCreator { + + @Override + public NodeDefinition create() { + MapNodeDefinition stageDef = mapDef( + tupleDef("match", MSG_PREDICATE_DEF), + tupleDef("mustOccur", BOOLEAN_DEF).setRequired(false), + tupleDef("time", LONG_GREATER_EQUAL_ZERO_DEF).setRequired(false), + tupleDef("alias", STRING_DEF).setRequired(false) + ); + stageDef.setTransformer(new SequenceFilterXStageNodeTransformer()); + + HeaderOperationNodeDefinition metricActionDef = new HeaderOperationNodeDefinition( + valueDef() + ); + + KeyNameVaryListNodeDefinition actionsDef = new KeyNameVaryListNodeDefinition() + .setNodeTransformer(new SequenceFilterXActionNodeTransformer()) + .add("metric", metricActionDef) + .add("multiplication", INT_GREATER_THAN_ZERO_DEF); + + + return mapDef( + tupleDef("dir", enumDef(FilterDirection.class)).setRequired(false), + tupleDef("sequence", new ListNodeDefinition(stageDef).setMinNumberOfValues(2)), + tupleDef("action", actionsDef).setRequired(false) + ); + } + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/filter/SequenceFilterXActionNodeTransformer.java Mon Jun 08 11:55:22 2020 +0200 @@ -0,0 +1,96 @@ +package com.passus.st.filter; + +import com.passus.config.*; +import com.passus.config.schema.NodeTransformer; +import com.passus.config.validation.Errors; +import com.passus.filter.ValueExtractor; +import com.passus.filter.config.PredicateNodeTransformer; +import com.passus.filter.config.ValueExtractorTransformer; +import com.passus.st.filter.SequenceFilterX.Action; + +import java.util.*; + +import static com.passus.config.ConfigurationUtils.extractInteger; +import static com.passus.st.validation.NodeValidationUtils.validateType; + +public class SequenceFilterXActionNodeTransformer implements NodeTransformer<CNode> { + + private static final PredicateNodeTransformer TRANSFORMER = Transformers.PREDICATE; + + @Override + public CValueNode transform(CNode node, Errors errors, ConfigurationContext context) { + CMapNode mapNode = (CMapNode) node; + + List<CTupleNode> tuples = mapNode.getChildren(); + List<Action> actions; + if (tuples.isEmpty()) { + actions = Collections.EMPTY_LIST; + } else { + actions = new ArrayList<>(); + } + + final ValueExtractorTransformer transformer = Transformers.fieldValueExtractorTransformer(context); + for (CTupleNode tuple : tuples) { + String actionName = tuple.getName(); + try { + errors.pushNestedPath(actionName); + Action action = null; + switch (actionName.toLowerCase()) { + case "multiplication": + int multiplier = extractInteger(tuple.getNode(), errors); + action = new SequenceFilterX.MultiplicationAction(multiplier); + break; + case "metric": { + if (validateType(tuple.getNode(), NodeType.MAP, errors)) { + CMapNode metricNode = (CMapNode) tuple.getNode(); + Map<String, ValueExtractor> values = new HashMap<>(metricNode.size()); + + + for (CTupleNode mapTupleNode : metricNode.getChildren()) { + try { + errors.pushNestedPath(mapTupleNode.getName()); + try { + String paramName = mapTupleNode.getName(); + ValueExtractor valueExtractor = transformer.transform(mapTupleNode.getNode(), errors, context); + if (errors.hasError()) { + return null; + } + + values.put(paramName, valueExtractor); + } catch (Exception e) { + errors.reject(mapTupleNode.getNode(), "Invalid expression."); + } + } finally { + errors.popNestedPath(); + } + } + + action = new SequenceFilterX.CreateMetricsAction(values); + } + + break; + } + default: + throw new IllegalStateException("Not supported action '" + actionName + "'."); + } + + if (errors.hasError()) { + return null; + } + + if (action != null) { + actions.add(action); + } + } finally { + errors.popNestedPath(); + } + } + + return new CValueNode(actions); + } + + @Override + public CNode reverseTransform(CNode node, Errors errors, ConfigurationContext context) { + throw new UnsupportedOperationException("Not supported yet."); + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/filter/SequenceFilterXStageNodeTransformer.java Mon Jun 08 11:55:22 2020 +0200 @@ -0,0 +1,61 @@ +package com.passus.st.filter; + +import com.passus.config.*; +import com.passus.config.schema.NodeTransformer; +import com.passus.config.validation.Errors; +import com.passus.filter.config.PredicateNodeTransformer; +import com.passus.st.filter.SequenceFilterX.Stage; + +import java.util.List; +import java.util.function.Predicate; + +public class SequenceFilterXStageNodeTransformer implements NodeTransformer<CNode> { + + private static final PredicateNodeTransformer TRANSFORMER = Transformers.PREDICATE; + + @Override + public CNode transform(CNode node, Errors errors, ConfigurationContext context) { + try { + CMapNode mapNode = (CMapNode) node; + List<CTupleNode> tupleNodes = mapNode.getChildren(); + + Predicate predicate = null; + boolean mustOccur = true; + long time = 0; + String alias = null; + for (CTupleNode tupleNode : tupleNodes) { + String name = tupleNode.getName(); + CNode valueNode = tupleNode.getNode(); + switch (name) { + case "match": + predicate = TRANSFORMER.transform(valueNode); + break; + case "mustOccur": + mustOccur = (Boolean) getValue(valueNode); + break; + case "time": + time = (Long) getValue(valueNode); + break; + case "alias": + alias = (String) getValue(valueNode); + break; + } + } + + Stage stage = new Stage(predicate, time, mustOccur, alias); + return new CValueNode(stage); + } catch (Exception ex) { + return node; + } + } + + @Override + public CNode reverseTransform(CNode node, Errors errors, ConfigurationContext context) { + throw new UnsupportedOperationException("Not supported yet."); + } + + private static Object getValue(CNode node) { + CValueNode valueNode = (CValueNode) node; + return valueNode.getValue(); + } +} \ No newline at end of file