Mercurial > stress-tester
changeset 1186:8b605b57e68d
flow analyzer - in progress
author | Devel 1 |
---|---|
date | Wed, 17 Jun 2020 13:33:53 +0200 |
parents | a8cd127261a8 |
children | 013fb325d040 |
files | stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java stress-tester/src/main/java/com/passus/st/scanner/FlowAnalyzerCommand.java stress-tester/src/main/java/com/passus/st/scanner/HttpUrlSequencePayloadAnalyzer.java stress-tester/src/main/resources/flow_analyzer.py stress-tester/src/test/java/com/passus/st/scanner/FlowAnalyzerCommandTest.java |
diffstat | 5 files changed, 711 insertions(+), 4 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java Wed Jun 17 12:39:18 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java Wed Jun 17 13:33:53 2020 +0200 @@ -6,6 +6,8 @@ /** * @author Mirosław Hawrot + * @param <R> + * @param <S> */ public class SessionPayloadEvent<R, S> extends SessionEvent {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/java/com/passus/st/scanner/FlowAnalyzerCommand.java Wed Jun 17 13:33:53 2020 +0200 @@ -0,0 +1,96 @@ +package com.passus.st.scanner; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; + +/** + * + * @author mikolaj.podbielski + */ +public class FlowAnalyzerCommand { + + public static final boolean IS_WINDOWS = System.getProperty("os.name").contains("Windows"); + public static final String DEFAULT_PYTHON_CMD = IS_WINDOWS ? "python" : "python3"; + public static final String DEFAULT_SCRIPT_PATH = "flow_analyzer.py"; + public static final String RESOURCE = "/flow_analyzer.py"; + + private String pythonCmd = DEFAULT_PYTHON_CMD; + private String scriptPath = DEFAULT_SCRIPT_PATH; + private String dataPath; + + static void extractEmbeddedScript(File target) throws IOException { + target.delete(); + extractResource(RESOURCE, target); + } + + static void extractResource(String resource, File dest) throws IOException { + try (InputStream is = FlowAnalyzerCommand.class.getResourceAsStream(resource);) { + Path target = Paths.get(dest.toURI()); + Files.copy(is, target); + } + } + + void run() throws IOException, InterruptedException { + checkExists(scriptPath, "scriptPath"); + checkExists(dataPath, "dataPath"); + checkCmd(pythonCmd); + + List<String> commandLine = Arrays.asList(pythonCmd, scriptPath, dataPath); + ProcessBuilder pb = new ProcessBuilder(commandLine); + + boolean showOutput = true; + if (showOutput) { + pb.redirectErrorStream(true); + pb.inheritIO(); + } + + Process process = pb.start(); + process.waitFor(); + } + + static void checkCmd(String cmd) throws IOException, InterruptedException { + try { + List<String> commandLine = Arrays.asList(cmd, "--version"); + ProcessBuilder pb = new ProcessBuilder(commandLine); + Process process = pb.start(); + process.waitFor(); + } catch (Exception ex) { + throw new IllegalArgumentException("Cannot launch command"); + } + } + + static void checkExists(String path, String name) { + if (path == null) { + throw new IllegalArgumentException("Argument " + name + " must be not null."); + } + File file = new File(path); + if (!file.exists()) { + throw new IllegalArgumentException("File " + file.getAbsolutePath() + " does not exist."); + } + } + + public static void main(String[] args) { + FlowAnalyzerCommand cmd = new FlowAnalyzerCommand(); + + int idx = 0; + switch (args.length) { + case 3: + cmd.pythonCmd = args[idx++]; + case 2: + cmd.scriptPath = args[idx++]; + case 1: + cmd.dataPath = args[idx++]; + break; + default: + throw new IllegalArgumentException(); + } + + } + +}
--- a/stress-tester/src/main/java/com/passus/st/scanner/HttpUrlSequencePayloadAnalyzer.java Wed Jun 17 12:39:18 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/scanner/HttpUrlSequencePayloadAnalyzer.java Wed Jun 17 13:33:53 2020 +0200 @@ -1,28 +1,53 @@ package com.passus.st.scanner; import com.passus.commons.annotations.Plugin; +import com.passus.config.Configurable; +import com.passus.config.Configuration; +import com.passus.config.ConfigurationContext; +import com.passus.config.annotations.NodeDefinitionCreate; +import com.passus.config.schema.NodeDefinition; +import com.passus.config.schema.NodeDefinitionCreator; +import com.passus.net.http.HttpHeaders; import com.passus.net.http.HttpRequest; +import com.passus.net.http.HttpResponse; import com.passus.st.client.DataEvents; import com.passus.st.client.Event; import com.passus.st.client.SessionPayloadEvent; import com.passus.st.metric.MetricSource; import com.passus.st.metric.MetricsContainer; import com.passus.st.plugin.PluginConstants; +import java.text.SimpleDateFormat; +import java.util.Date; +import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef; +import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef; import static com.passus.st.Protocols.HTTP; +import static com.passus.st.config.CommonNodeDefs.STRING_DEF; @Plugin(name = HttpUrlSequencePayloadAnalyzer.TYPE, category = PluginConstants.CATEGORY_SCANNER_ANALYZER) -public class HttpUrlSequencePayloadAnalyzer implements ScannerAnalyzer, MetricSource { +@NodeDefinitionCreate(HttpUrlSequencePayloadAnalyzer.NodeDefCreator.class) +public class HttpUrlSequencePayloadAnalyzer implements ScannerAnalyzer, MetricSource, Configurable { public static final String TYPE = "httpUrlSequence"; private HttpUrlSequences metric; + private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH_mm_ss"); + + private UserIdExtractor userIdExtractor; + public String getType() { return TYPE; } @Override + public void configure(Configuration config, ConfigurationContext context) { + String userIdSource = config.getString("userIdSource"); + userIdExtractor = resolveUserIdExtractor(userIdSource); + System.out.println(""); + } + + @Override public void activate() { metric = new HttpUrlSequences(); metric.activate(); @@ -54,12 +79,12 @@ if (event.getType() == SessionPayloadEvent.TYPE) { write((SessionPayloadEvent) event); } else if (event.getType() == DataEvents.DataEnd.TYPE) { - + populateMetric(); } } private void populateMetric() { - + System.out.println(""); } private void write(SessionPayloadEvent event) { @@ -68,10 +93,47 @@ } HttpRequest req = (HttpRequest) event.getRequest(); + HttpResponse resp = (HttpResponse) event.getResponse(); if (req != null) { + String uri = req.getUri().toString(); + int qidx = uri.indexOf('?'); + String path; + String query; + if (qidx < 0) { + path = uri; + query = null; + } else { + path = uri.substring(0, qidx); + } + + String method = req.getMethod().toString(); + String datetime = sdf.format(new Date(req.getTimestamp())); + String userId = userIdExtractor.extract(event); + String userAgent = req.getHeaders().get(HttpHeaders.USER_AGENT).toString(); // unused + String status = resp == null ? "" : resp.getStatus().toString(); // unused + String protocol = Integer.toString(req.getVersion()); // unused, eg. "HTTP/1.1" +//pathBase pathArgs request_method datetime id agent status protocol } } -} \ No newline at end of file + public interface UserIdExtractor { + String extract(SessionPayloadEvent event); + } + + static UserIdExtractor resolveUserIdExtractor(String userIdSource) { + return (event) -> "id"; + } + + public static class NodeDefCreator implements NodeDefinitionCreator { + + @Override + public NodeDefinition create() { + return mapDef( + tupleDef("userIdSource", STRING_DEF).setRequired(false) + ); + } + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/main/resources/flow_analyzer.py Wed Jun 17 13:33:53 2020 +0200 @@ -0,0 +1,514 @@ +import pandas as pd +import re +import gc +from pathlib import Path + +from urllib import parse +from collections import Counter + +from itertools import groupby + +import ujson as json + + +class flowAnalyzer: + def get_files_list(self, pathString="", glob=False, globString="", disallowedSuffixex = [".gz", ".zip"], debug=False): + '''Funkcja, której celem jest przygotowanie listy plików z logami do wczytania + Może działać na dwa sposoby przez podanie po prostu ścieżki pliku z logami + lub przez znalezienie wszystkich plików przez glob + ''' + # Tutaj stworzona zostanie na nowo lista plików w obiekcie + self.fileList = [] + + if glob: + # Wykorzystujemy pathlib aby lepiej działać między systemowo + path = Path(pathString) + for filename in path.glob(globString): + # Upewniamy się, że plik nie ma zakazanego rozszerzenia + if filename.suffix not in disallowedSuffixex: + self.fileList.append(filename) + else: + self.fileList.append(pathString) + + if debug: + print(self.fileList) + + def extract_lines(self, disallowedStrings, debug = False): + ''' + Czytanie wszystkich plików linia po linii. + Dodajemy sobie tylko proste warunki sprawdzenia poprawności linii przez zawieranie się stringów + ''' + # nie robimy self.lines, żeby nie marnować pamięci przy większych plikach i nie musieć explicite kasować + lines = [] + skippedLines = 0 + for file in self.fileList: + with open(file) as fp: + for line in fp: + # Sprawdzamy czy ta linijka nie zawiera jakiś "niedozwolonych fragmentów" + # UWAGA, poniższy if na pewno nie jest królem wydajności + if any([x in line for x in disallowedStrings]): + skippedLines +=1 + continue + lines.append(line.rstrip('\n')) + if debug: + print("Liczba pominiętych linii:", skippedLines) + return lines + + def get_events_line_regex(self, rePatter, disallowedStrings = ["system overloaded"], + dateFormat = "%d/%b/%Y:%H:%M:%S", debug = False): + ''' + Funkcja, której celem jest parsowanie logów z założeniem, że + możemy przejść regexem grupowym każdą linijkę każdego pliku na naszej liści + + UWAGA: zastanawiałem się czy lepsze jest takie podejście czy może robienie regexów linia po linii. + Zostawiłem takie (gdzie najpierw zbieramy wszystkie linie, a potem robimy regexpy) dla czytelności. + Wszystko jedno jak to będzie w praktyce. + ''' + lines = self.extract_lines(disallowedStrings=disallowedStrings, debug=debug) + + pat = re.compile(rePatter) + rows = [] + for line in lines: + row = re.match(pat, line) + tdict = row.groupdict() + rows.append([tdict["id"], tdict["datetime"], tdict["requestMethod"], tdict["url"], tdict["protocol"], tdict["status"], tdict["agent"]]) + self.data = df = pd.DataFrame(rows, columns=["id", "datetime", "requestMethod", "url", "protocol", "status", "agent"]) + + # Linijskan ap otrzeby szybszych testów do skasowania + self.data = self.data.iloc[0:10000] + + # Zmiana tekstowego formatu daty na datę pandasową, żeby można bylo skutecznie sortować + self.data["datetime"] = pd.to_datetime(self.data.datetime, format=dateFormat) + if debug: + print("Wczytana liczba wierszy i kolumn: ", self.data.shape) + def get_events_line_json(self, disallowedStrings = ["system overloaded"], + dateFormat = None, columnMap = {}, debug = False): + ''' + Funkcja, której celem jest parsowanie logów z założeniem, że na początku jest data, a dalej spacją rozdzielone datetime + + UWAGA: zastanawiałem się czy lepsze jest takie podejście czy może robienie regexów linia po linii. + Zostawiłem takie (gdzie najpierw zbieramy wszystkie linie, a potem robimy regexpy) dla czytelności. + Wszystko jedno jak to będzie w praktyce. + ''' + lines = self.extract_lines(disallowedStrings=disallowedStrings, debug=debug) + + rows = [] + for line in lines: + line = json.loads(line) + try: + line["httpreqheaders"] = line["httpreqheaders"][re.search("SESSION=", line["httpreqheaders"]).end():] + line["httpreqheaders"] = line["httpreqheaders"][:re.search("; ", line["httpreqheaders"]).start()] + except: + if debug: + print(line) + continue + rows.append([ + line[columnMap.get("id", "id")], + line[columnMap.get("datetime", "datetime")], + line[columnMap.get("requestMethod", "requestMethod")], + line[columnMap.get("url", "url")], + line[columnMap.get("protocol", "protocol")], + line[columnMap.get("status", "status")], + line[columnMap.get("agent", "agent")] + ]) + + self.data = df = pd.DataFrame(rows, columns=["id", "datetime", "requestMethod", "url", "protocol", "status", "agent"]) + + # Linijskan ap otrzeby szybszych testów do skasowania + # self.data = self.data.iloc[0:10000] + + # Zmiana tekstowego formatu daty na datę pandasową, żeby można bylo skutecznie sortować + self.data["datetime"] = pd.to_datetime(self.data.datetime, format=dateFormat) + if debug: + print("Wczytana liczba wierszy i kolumn: ", self.data.shape) + + def get_data_excel(self, filename="", columnMap = {}): + df = pd.read_excel(filename) + df = df.rename(columns=columnMap) + df = df[columnMap.values()] + df["datetime"] = pd.to_datetime(df.datetime) + self.data = df + + def prep_data(self, cleanRows = True, cleanRules = None, dropMissing = True, debug=False): + if cleanRules is None: + self.cleanRules = [ + # Zasoby doatkowe typu css/js + ".css", ".js", + # Elementy graficzne + ".jpg", ".png", ".svg", ".gif", ".ico", + # Czcionki + ".ttf", ".woff", ".woff2", ".eot", + # Dokumenty + ".pdf", ".doc", ".docx", ".ppt", ".pptx", ".txt", + # Inne elementy jak np strony sprawdzające captch + "captcha" + ] + else: + self.cleanRules = cleanRules + + + # Rozbijamy ścieżkę na abzowy url i argumenty przez znak "?" + + temp = self.data['url'].str.split('?', 1, expand=True) + + + # Konwersja na stringi + self.data["pathBase"] = temp[0].astype(str) + if temp.shape[1]<2: + self.data["pathArgs"] = "" + else: + self.data["pathArgs"] = temp[1].fillna("").astype(str) + + + # Proste czyszczenie i normalizowanie stringów (url, request method) + self.data["pathArgs"] = self.data["pathArgs"].str.lower().str.strip() + self.data["pathBase"] = self.data["pathBase"].str.lower().str.strip() + self.data["requestMethod"] = self.data["requestMethod"].str.lower().str.strip() + + if dropMissing: + shapeBefore = self.data.shape + self.data = self.data.dropna(subset=["id", "datetime", "pathBase"]) + if debug: + print("Rozmiar przed missingami:", shapeBefore) + print("Rozmiar po missingach:", self.data.shape) + + if cleanRows: + shapeBefore = self.data.shape + idsToRemove = [] + if debug: + print("Liczba wierszy spełniających regułę:") + for rule in self.cleanRules: + # UWAGA + # Poniższa linia nie jest na pewno bardzo wydajna, ale jest prosta i szybka + # Sposób czyszczenia to detal który jest do dopracowania / zmiany wedle uznania + temp = self.data.loc[self.data.pathBase.str.contains(rule, case=False, regex=False)].index.tolist() + if debug: + print(rule, len(temp)) + idsToRemove.extend(temp.copy()) + self.data = self.data.drop(idsToRemove) + + if debug: + print("Rozmiar przed czyszczeniem:", shapeBefore) + print("Rozmiar po czyszczeniu:", self.data.shape) + + def clean_base(self, maxPathDepth=False, whiteListParts = False): + if maxPathDepth: + splitted = [x.split("/")[0:maxPathDepth] for x in self.data["pathBase"]] + # Rozbijam na potrzeby czytelności i ewentualnie wydajności. + # Jeżeli jest max to najpierw go ograniczamy, jezlei robimy whitelist, ale bez max to tez musimy splitować + # Jeżeli jest tylko whitelist to nie znamy wartosci maxPathDepth + # Można tez przyjąć maxPathDept 1000 jako wartośc domyślną i mieć w jednej linii + elif whiteListParts: + splitted = [x.split("/") for x in self.data["pathBase"]] + + # Jeżeli white list to filtrujemy + if whiteListParts: + # splitted to lista list + # ulr to element splitted + # part to element url + splitted = [part for part in url if part in whiteListParts for url in splitted] + + # Musimy teraz "skleić" + if maxPathDepth or whiteListParts: + filteredBase = ["/".join(parts) for parts in splitted] + else: + filteredBase = self.data["pathBase"].tolist() + return filteredBase + + def clean_args(self, whiteListArgs = False, blacklistArgs=False): + + args = self.data["pathArgs"].map(parse.parse_qsl) + if whiteListArgs: + args1 = [] + for argList in args: + args1.append([(x,y) for x,y in argList if x in whiteListArgs]) + # Cały for i nadpisanie mogą być w jednej linii, ale wtedy kod jest mniej czytelny + # Możliwe też, że w implementacje w jave whitelist będzie to wczesniej już na etapie parsowania + + args = args1 + elif blacklistArgs: + args1 = [] + for argList in args: + args1.append([(x,y) for x,y in argList if x not in blacklistArgs]) + # Cały for i nadpisanie mogą być w jednej linii, ale wtedy kod jest mniej czytelny + # Możliwe też, że w implementacje w jave blacklist będzie to wczesniej już na etapie parsowania + + args = args1 + return args + + def explore(self, topN = 20, maxPathDepth=False, whiteListParts = False, whiteListArgs = False, blacklistArgs=False): + ''' + Zwraca statystyki dotyczące ścieżek, argumentów i wartości. + Pozwala zrozumieć co powinniśmy rozumieć przez unikatowy "krok" i z jakimi ustawieniami dokonać analizy + + maxPathDepth : mówi jak wiele elementów urla bierzemy pod uwagę (jaka maksymalna głebokość) + allowedParts : mówi jakie elementy urla są dozwolone. + + Filtrowanie argumentów pozwala na to aby nie brać pod uwage jakiś argumentów w analizie lub brać tylko wybrane + Jeżeli whiteListArgs jest zbiorem lub listą, to blacklistArgs nie jest brane pod uwagę + ''' + + filteredBase = self.clean_base(maxPathDepth, whiteListParts) + + countBase = sorted(list(Counter(filteredBase).items()), key=lambda x: x[1], reverse=True) + print("Top base url:") + for url in countBase[0:topN]: + print(url) + + args = self.clean_args(whiteListArgs, blacklistArgs) + + + urlArgs = [] + for url, argList in zip(filteredBase, args): + urlArgs.append(url+"|"+"_".join(sorted([x for x,y in argList]))) + + countUrlArg = sorted(list(Counter(urlArgs).items()), key=lambda x: x[1], reverse=True) + print("\n\nTop base url with args") + for value in countUrlArg[0:topN]: + print(value) + + + argPairs = sum(args, []) + argsOnly = [x for x,y in argPairs] + + countArgs = sorted(list(Counter(argsOnly).items()), key=lambda x: x[1], reverse=True) + print("\n\nTop args:") + for value in countArgs[0:topN]: + print(value) + + countPairs = sorted(list(Counter(argPairs).items()), key=lambda x: x[1], reverse=True) + print("\n\nTop pairs") + for value in countPairs[0:topN]: + print(value) + + def prep_sequences(self, useId=True, + useBase=True, maxPathDepth=False, whiteListParts = False, + useArgs = False, whiteListArgs = False, blacklistArgs=False, + useValues=False, + perUser = True, maxTimeDelta = 600, dropDuplicates = False, + groupOthers = False, othersLimit = 0.01, + debug = False): + ''' + Funkcja musi przygotować identyfikator "zdarzenia". + Każde "zdarzenie" może być identyfikowane przez dowolne kombinacje urli, czesci argumentów i wartości + Tutaj można mieć jeszcze więcej pomysłów, ale chyba na tym etapie wystarczy tyle. + + useId mówi czy mamy jakieś "id" które kontrolumy i czy chcemy go używać, czy moze lecieć po wszystkim chronologicznie. + + Bo można definiować jakieś white listy (na całe fragmenty nie tylko czesci) regexy na czesci itp. + Podobnie z argumentami. + W tej chwili możemy mieć: + * base + * base+args + * args + * args+values + * base+args+values + + Przy czym argsy i partsyBasa możemy kontrolować + + perUser: czy łaczyć wszystkie zdarzenia w jedną sekwencję po czasie czy działać indywidualnie epr user. + maxTimeDelta: sekundy maksymalny czas w ramach jednego "usera", + który może minąć aby kolejne zdrzenia pozostały w jednej sekwencji + dropDuplicates: czy jeżeli w sekwencja mamy ABBDCA to zamieniamy je na ABDCA przed rozpoczeciem analizy. + Duplikaty rozumiemy jako dwa zdarzenia tego samego usera w ramac htej samej sesji czasowej. + groupOthers: Pozwala na to aby wszystkie id rzadko wystepujące zamienić na "OTHER" + othersLimit: procentowa wartość (0-1) poniżej której zamieniamy na "OTHER" + + ''' + ids = [] + + if useBase: + filteredBase = self.clean_base(maxPathDepth, whiteListParts) + else: + filteredBase = ["url" for x in self.data["pathBase"]] + + + if useArgs: + args = self.clean_args(whiteListArgs, blacklistArgs) + + if useValues: + for url, argList in zip(filteredBase, args): + ids.append(url+"|"+"_".join(sorted([x+"+"+y for x,y in argList]))) + else: + for url, argList in zip(filteredBase, args): + ids.append(url+"|"+"_".join(sorted([x for x,y in argList]))) + + else: + ids = filteredBase + + if groupOthers: + icounts = dict(Counter(ids)) + icounts = {k:v/len(ids) for k,v in icounts.items()} + ids = [x if icounts[x] > othersLimit else "OTHER" for x in ids ] + + # Mapujemy "nazwy" sekwencji na integery, żeby nei trzymać w pamięci miliona niepotrzebnych stringów + uniqueIds = sorted(list(set(ids))) + self.idsMap = {idi:i for i, idi in enumerate(uniqueIds)} + self.idsMapRev = {i:idi for i, idi in enumerate(uniqueIds)} + + # Tworzymy w naszym zbiorze data kolumnę z id sekwencji + # Zbór data zawiera informacje o czasie i "id" więc sekwencje będziemy grupować później. + self.data["idsn"] = [self.idsMap[idi] for idi in ids] + + if perUser: + # Pogrupujmy wiersze po id + groupped = self.data.groupby("id") + else: + # Jeżeli nie grupujemy to i tak stworzymy strukturę danych analogiczną + # do efektu grupowania (lista krotek z nazwą i danymi) + groupped = [("all", self.data)] + + + + # Przygotujmy sobie liste wszystkich sekwencji z uwzględnieniem usera i maxTimeDelta + allSequences = [] + for name, df in groupped: + # Sortujemy po czasie w gramach grupy ID + df = df.sort_values("datetime", ascending=True) + + # Liczymy sobie deltę czasu pomiędzy zdarzeniami + df['datetimeDelta'] = (df['datetime']-df['datetime'].shift()).fillna(pd.Timedelta(seconds=0)).dt.seconds + + for i, (seqId, timeDelta) in enumerate(zip(df["idsn"], df["datetimeDelta"])): + # Poniżej nei biję rekordu wydajności, ale czytelności: + if i ==0 : + # Jeżeli to nowa gruap danego id to tworzymy nową listę + newSeq = [seqId] + elif timeDelta < maxTimeDelta: + # Jeżeli kolejny wiersz mieści się w czasie maxTimeDelta to dodajemy do dotychczasowej listy + newSeq.append(seqId) + else: + # W przeciwnym wypadku zamykamy poprzednią listę dodając do całego zbioru + allSequences.append(newSeq) + # I od nowa rozpoczynamy sekwencję + newSeq = [seqId] + # Na koniec dodajemy ostatni element który został + allSequences.append(newSeq) + + if dropDuplicates: + # usuwanei duplikatów można by zrobić wcześniej n apotrzeby zwiększenia wydajności + allSequences = [[k for k,g in groupby(seq)] for seq in allSequences] + + if debug: + print(allSequences[0:20]) + + self.allSequences = allSequences + + def analyze_markov(self, historyLength = 1, targetLength = 5, shareLimit = 0.0001): + ''' + Funkcja która wykona analizę sekwencji. Funkcja zakłada, że id sekwencji już zostały przygotowane: + historyLength: dla jak wielu elementów bierzemy historię by przewidywać następny element. + + ''' + + # transitions to słownik słowników mówiący ile razy przechodzimy zjednej historii do kolejnego zdarzenia + transitions = {} + # To ile będzie kluczy w pierwszym słowniku zalezy od głębokosci historii i liczby unikatowych kombinacji id + # To ile będzie kluczy w drugich słownikach zalezy od liczby unikatowych id + + totalNextSum = 0 + for seq in self.allSequences: + if len(seq) >= historyLength+1: + # Początek sekwencji to start + for i in range(historyLength, len(seq)): + keyName = "|".join([str(x) for x in seq[i-historyLength:i]]) + # Znajdujemy słownik z przejściami dla danej historii. + val = transitions.get(keyName, {}) + # Aktualizujemy słownik dodajć 1 + val[seq[i]] = val.get(seq[i], 0)+1 + totalNextSum +=1 + # Aktualizujemy słownik w głównym słowniku + transitions[keyName] = val + # Skoro już mamy wszystkie zdarzenia to wiemy ile razy która sekwencja będzie startowa + # w tradycyjnym podejściu markowa jesteśmy pewni gdzie jest początek sekwencji + # tutaj nie możemy być pewni więc zakładamy że może być w dowolnym miejscu + + # na początku będzie tyle "starterów" sekwencji ile mamy takich fragmentów w historii + sequences = [] + for key, value in transitions.items(): + # key to stringowe id sekwencji + # value to słownik z przejściami + # value.values() zawiera liczebności + # dana sekwencja key wystąpiła tyle razy co suma wartości liczników w wartościach + sequences.append((key.split("|"), sum(value.values()))) + + for k in range(targetLength-historyLength): + newSequences = [] + # Pogłębiamy sekwencje probabilistycznie + for seq, number in sequences: + # + keyName = "|".join([str(x) for x in seq[-historyLength:]]) + # Znajdujemy słownik z przejściami dla danej historii. + val = transitions.get(keyName, {}) + total = sum(val.values()) + + for nextEl, numberNext in val.items(): + if (number*numberNext/total)/totalNextSum > shareLimit: + newSequences.append((seq+[str(nextEl)], number*numberNext/total)) + sequences = newSequences + + # Normalizujemy i sortujemy + self.markov = sorted([(x, y/totalNextSum) for x,y in sequences], reverse=True, key=lambda x:x[1]) + self.markovMapped = sorted([([self.idsMapRev[int(z)] for z in x], y/totalNextSum) for x,y in sequences], reverse=True, key=lambda x:x[1]) + self.markov_not_norm = sorted([(x, y) for x,y in sequences], reverse=True, key=lambda x:x[1]) + return self.markovMapped + + + def analyze_markov2(self, historyLength = 1, targetLength = 5, shareLimit = 0.0001, foldSequences=False, foldDepth=2): + ''' + Funkcja która wykona analizę sekwencji. Funkcja zakłada, że id sekwencji już zostały przygotowane: + historyLength: dla jak wielu elementów bierzemy historię by przewidywać następny element. + + ''' + + # transitions to słownik słowników mówiący ile razy przechodzimy zjednej historii do kolejnego zdarzenia + transitions = {} + # To ile będzie kluczy w pierwszym słowniku zalezy od głębokosci historii i liczby unikatowych kombinacji id + # To ile będzie kluczy w drugich słownikach zalezy od liczby unikatowych id + + totalNextSum = 0 + for seq in self.allSequences: + if len(seq) >= historyLength+1: + # Początek sekwencji to start + for i in range(historyLength, len(seq)): + keyName = "|".join([str(x) for x in seq[i-historyLength:i]]) + # Znajdujemy słownik z przejściami dla danej historii. + val = transitions.get(keyName, {}) + # Aktualizujemy słownik dodajć 1 + val[seq[i]] = val.get(seq[i], 0)+1 + totalNextSum +=1 + # Aktualizujemy słownik w głównym słowniku + transitions[keyName] = val + # Skoro już mamy wszystkie zdarzenia to wiemy ile razy która sekwencja będzie startowa + # w tradycyjnym podejściu markowa jesteśmy pewni gdzie jest początek sekwencji + # tutaj nie możemy być pewni więc zakładamy że może być w dowolnym miejscu + + # na początku będzie tyle "starterów" sekwencji ile mamy takich fragmentów w historii + sequences = [] + for key, value in transitions.items(): + # key to stringowe id sekwencji + # value to słownik z przejściami + # value.values() zawiera liczebności + # dana sekwencja key wystąpiła tyle razy co suma wartości liczników w wartościach + sequences.append((key.split("|"), sum(value.values()))) + + for k in range(targetLength-historyLength): + newSequences = [] + # Pogłębiamy sekwencje probabilistycznie + for seq, number in sequences: + # + keyName = "|".join([str(x) for x in seq[-historyLength:]]) + # Znajdujemy słownik z przejściami dla danej historii. + val = transitions.get(keyName, {}) + total = sum(val.values()) + + for nextEl, numberNext in val.items(): + if (number*numberNext/total)/totalNextSum > shareLimit: + newSequences.append((seq+[str(nextEl)], number*numberNext/total)) + sequences = newSequences + + # Normalizujemy i sortujemy + self.markov = sorted([(x, y/totalNextSum) for x,y in sequences], reverse=True, key=lambda x:x[1]) + self.markovMapped = sorted([([self.idsMapRev[int(z)] for z in x], y/totalNextSum) for x,y in sequences], reverse=True, key=lambda x:x[1]) + self.markov_not_norm = sorted([(x, y) for x,y in sequences], reverse=True, key=lambda x:x[1]) + return self.markovMapped \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/stress-tester/src/test/java/com/passus/st/scanner/FlowAnalyzerCommandTest.java Wed Jun 17 13:33:53 2020 +0200 @@ -0,0 +1,33 @@ +package com.passus.st.scanner; + +import com.passus.st.utils.TestResourceUtils; +import java.io.File; +import static org.testng.AssertJUnit.*; +import org.testng.annotations.Test; + +/** + * + * @author mikolaj.podbielski + */ +public class FlowAnalyzerCommandTest { + + @Test + public void testExtractEmbeddedScript() throws Exception { + File dir = TestResourceUtils.createTmpDir(); + File target = new File(dir, "script.py"); + target.deleteOnExit(); + System.out.println("Extracting to " + target.getAbsolutePath()); + FlowAnalyzerCommand.extractEmbeddedScript(target); + assertEquals(25_024, target.length()); + } + + public static void main(String[] args) throws Exception { + FlowAnalyzerCommand.checkCmd("python"); + try { + FlowAnalyzerCommand.checkCmd("no-such-python"); + fail("Should fail on non-existing command"); + } catch (IllegalArgumentException ignore) { + + } + } +}