Mercurial > stress-tester
changeset 1222:f666342e4ad9
flow - in progress
author | Devel 1 |
---|---|
date | Thu, 25 Jun 2020 10:54:28 +0200 |
parents | 5d7393e2cf94 |
children | fedf5100fdd3 |
files | stress-tester/src/main/java/com/passus/st/scanner/FlowAnalyzerCommand.java stress-tester/src/main/java/com/passus/st/scanner/HttpUrlSequencePayloadAnalyzer.java stress-tester/src/main/resources/flow_analyzer.py stress-tester/src/main/resources/flow_analyzer_fold.py |
diffstat | 4 files changed, 140 insertions(+), 714 deletions(-) [+] |
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/scanner/FlowAnalyzerCommand.java Thu Jun 25 09:44:15 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/scanner/FlowAnalyzerCommand.java Thu Jun 25 10:54:28 2020 +0200 @@ -22,12 +22,14 @@ public static final String RESOURCE = "/flow_analyzer.py"; public static final int DEFAULT_LMIN = 3; public static final int DEFAULT_LMAX = 7; + public static final boolean DEFAULT_USE_FOLD = false; String pythonCmd = DEFAULT_PYTHON_CMD; String scriptPath = DEFAULT_SCRIPT_PATH; String dataPath; int lmin = DEFAULT_LMIN; int lmax = DEFAULT_LMAX; + boolean useFold = DEFAULT_USE_FOLD; List<String> cleanRules; static void extractEmbeddedScript(File target) throws IOException { @@ -51,6 +53,9 @@ commandLine.addAll(Arrays.asList( pythonCmd, scriptPath, dataPath, "-lmin", Integer.toString(lmin), "-lmax", Integer.toString(lmax) )); + if (useFold) { + commandLine.add("--fold"); + } if (cleanRules != null) { commandLine.add("-c"); commandLine.addAll(cleanRules); @@ -89,22 +94,4 @@ } } - public static void main(String[] args) { - FlowAnalyzerCommand cmd = new FlowAnalyzerCommand(); - - int idx = 0; - switch (args.length) { - case 3: - cmd.pythonCmd = args[idx++]; - case 2: - cmd.scriptPath = args[idx++]; - case 1: - cmd.dataPath = args[idx++]; - break; - default: - throw new IllegalArgumentException(); - } - - } - }
--- a/stress-tester/src/main/java/com/passus/st/scanner/HttpUrlSequencePayloadAnalyzer.java Thu Jun 25 09:44:15 2020 +0200 +++ b/stress-tester/src/main/java/com/passus/st/scanner/HttpUrlSequencePayloadAnalyzer.java Thu Jun 25 10:54:28 2020 +0200 @@ -37,6 +37,7 @@ import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef; import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef; import static com.passus.st.Protocols.HTTP; +import static com.passus.st.config.CommonNodeDefs.BOOLEAN_DEF; import static com.passus.st.config.CommonNodeDefs.INT_GREATER_THAN_ZERO_DEF; import static com.passus.st.config.CommonNodeDefs.STRING_DEF; import java.util.List; @@ -59,6 +60,7 @@ private List<String> cleanRules; private int lmin = FlowAnalyzerCommand.DEFAULT_LMIN; private int lmax = FlowAnalyzerCommand.DEFAULT_LMAX; + private boolean useFold; private CSVWriter dataWriter; @@ -73,6 +75,7 @@ dataPath = config.getString("dataPath", "sequence_data.csv"); lmin = config.getInteger("lmin", FlowAnalyzerCommand.DEFAULT_LMIN); lmax = config.getInteger("lmax", FlowAnalyzerCommand.DEFAULT_LMAX); + useFold = config.getBoolean("useFold", FlowAnalyzerCommand.DEFAULT_USE_FOLD); cleanRules = config.getList("cleanRules"); } @@ -155,6 +158,7 @@ command.cleanRules = cleanRules; command.lmin = lmin; command.lmax = lmax; + command.useFold = useFold; command.run(); File dir = new File("."); @@ -248,6 +252,7 @@ tupleDef("userIdSource", STRING_DEF).setRequired(false), tupleDef("dataPath", STRING_DEF).setRequired(false), tupleDef("cleanRules", listDef()).setRequired(false), + tupleDef("useFold", BOOLEAN_DEF).setRequired(false), tupleDef("lmin", INT_GREATER_THAN_ZERO_DEF).setRequired(false), tupleDef("lmax", INT_GREATER_THAN_ZERO_DEF).setRequired(false) );
--- a/stress-tester/src/main/resources/flow_analyzer.py Thu Jun 25 09:44:15 2020 +0200 +++ b/stress-tester/src/main/resources/flow_analyzer.py Thu Jun 25 10:54:28 2020 +0200 @@ -12,10 +12,10 @@ class flowAnalyzer: def get_files_list(self, pathString="", glob=False, globString="", disallowedSuffixex = [".gz", ".zip"], debug=False): - '''Funkcja, której celem jest przygotowanie listy plików z logami do wczytania + """Funkcja, której celem jest przygotowanie listy plików z logami do wczytania Może działać na dwa sposoby przez podanie po prostu ścieżki pliku z logami lub przez znalezienie wszystkich plików przez glob - ''' + """ # Tutaj stworzona zostanie na nowo lista plików w obiekcie self.fileList = [] @@ -33,10 +33,10 @@ print(self.fileList) def extract_lines(self, disallowedStrings, debug = False): - ''' + """ Czytanie wszystkich plików linia po linii. Dodajemy sobie tylko proste warunki sprawdzenia poprawności linii przez zawieranie się stringów - ''' + """ # nie robimy self.lines, żeby nie marnować pamięci przy większych plikach i nie musieć explicite kasować lines = [] skippedLines = 0 @@ -55,14 +55,14 @@ def get_events_line_regex(self, rePatter, disallowedStrings = ["system overloaded"], dateFormat = "%d/%b/%Y:%H:%M:%S", debug = False): - ''' + """ Funkcja, której celem jest parsowanie logów z założeniem, że możemy przejść regexem grupowym każdą linijkę każdego pliku na naszej liści UWAGA: zastanawiałem się czy lepsze jest takie podejście czy może robienie regexów linia po linii. Zostawiłem takie (gdzie najpierw zbieramy wszystkie linie, a potem robimy regexpy) dla czytelności. Wszystko jedno jak to będzie w praktyce. - ''' + """ lines = self.extract_lines(disallowedStrings=disallowedStrings, debug=debug) pat = re.compile(rePatter) @@ -83,13 +83,13 @@ def get_events_line_json(self, disallowedStrings = ["system overloaded"], dateFormat = None, columnMap = {}, debug = False): - ''' + """ Funkcja, której celem jest parsowanie logów z założeniem, że na początku jest data, a dalej spacją rozdzielone datetime UWAGA: zastanawiałem się czy lepsze jest takie podejście czy może robienie regexów linia po linii. Zostawiłem takie (gdzie najpierw zbieramy wszystkie linie, a potem robimy regexpy) dla czytelności. Wszystko jedno jak to będzie w praktyce. - ''' + """ lines = self.extract_lines(disallowedStrings=disallowedStrings, debug=debug) rows = [] @@ -252,7 +252,7 @@ return args def explore(self, topN = 20, maxPathDepth=False, whiteListParts = False, whiteListArgs = False, blacklistArgs=False): - ''' + """ Zwraca statystyki dotyczące ścieżek, argumentów i wartości. Pozwala zrozumieć co powinniśmy rozumieć przez unikatowy "krok" i z jakimi ustawieniami dokonać analizy @@ -261,7 +261,7 @@ Filtrowanie argumentów pozwala na to aby nie brać pod uwage jakiś argumentów w analizie lub brać tylko wybrane Jeżeli whiteListArgs jest zbiorem lub listą, to blacklistArgs nie jest brane pod uwagę - ''' + """ filteredBase = self.clean_base(maxPathDepth, whiteListParts) @@ -300,10 +300,10 @@ useBase=True, maxPathDepth=False, whiteListParts = False, useArgs = False, whiteListArgs = False, blacklistArgs=False, useValues=False, - perUser = True, maxTimeDelta = 600, dropDuplicates = False, + perUser = True, maxTimeDelta = 600, dropDuplicates = False, groupOthers = False, othersLimit = 0.01, - debug = False): - ''' + debug = False): + """ Funkcja musi przygotować identyfikator "zdarzenia". Każde "zdarzenie" może być identyfikowane przez dowolne kombinacje urli, czesci argumentów i wartości Tutaj można mieć jeszcze więcej pomysłów, ale chyba na tym etapie wystarczy tyle. @@ -329,7 +329,7 @@ groupOthers: Pozwala na to aby wszystkie id rzadko wystepujące zamienić na "OTHER" othersLimit: procentowa wartość (0-1) poniżej której zamieniamy na "OTHER" - ''' + """ ids = [] if useBase: @@ -356,7 +356,7 @@ icounts = {k:v/len(ids) for k,v in icounts.items()} ids = [x if icounts[x] > othersLimit else "OTHER" for x in ids ] - # Mapujemy "nazwy" sekwencji na integery, żeby nei trzymać w pamięci miliona niepotrzebnych stringów + # Mapujemy "nazwy" sekwencji na integery, żeby nie trzymać w pamięci miliona niepotrzebnych stringów uniqueIds = sorted(list(set(ids))) self.idsMap = {idi:i for i, idi in enumerate(uniqueIds)} self.idsMapRev = {i:idi for i, idi in enumerate(uniqueIds)} @@ -373,8 +373,6 @@ # do efektu grupowania (lista krotek z nazwą i danymi) groupped = [("all", self.data)] - - # Przygotujmy sobie liste wszystkich sekwencji z uwzględnieniem usera i maxTimeDelta allSequences = [] for name, df in groupped: @@ -400,8 +398,8 @@ # Na koniec dodajemy ostatni element który został allSequences.append(newSeq) + # usuwanie duplikatów można by zrobić wcześniej na potrzeby zwiększenia wydajności if dropDuplicates: - # usuwanei duplikatów można by zrobić wcześniej n apotrzeby zwiększenia wydajności allSequences = [[k for k,g in groupby(seq)] for seq in allSequences] if debug: @@ -410,11 +408,10 @@ self.allSequences = allSequences def analyze_markov(self, historyLength = 1, targetLength = 5, shareLimit = 0.0001): - ''' + """ Funkcja która wykona analizę sekwencji. Funkcja zakłada, że id sekwencji już zostały przygotowane: historyLength: dla jak wielu elementów bierzemy historię by przewidywać następny element. - - ''' + """ # transitions to słownik słowników mówiący ile razy przechodzimy zjednej historii do kolejnego zdarzenia transitions = {} @@ -468,14 +465,12 @@ self.markov_not_norm = sorted([(x, y) for x,y in sequences], reverse=True, key=lambda x:x[1]) return self.markovMapped - - def analyze_markov2(self, historyLength = 1, targetLength = 5, shareLimit = 0.0001, - foldSequences=False, foldDepth=2): - ''' + def analyze_markov_fold(self, historyLength=1, targetLength=5, shareLimit=0.0001, + foldLoops=False, foldMaxElements=1, debug=False): + """ Funkcja która wykona analizę sekwencji. Funkcja zakłada, że id sekwencji już zostały przygotowane: historyLength: dla jak wielu elementów bierzemy historię by przewidywać następny element. - - ''' + """ # transitions to słownik słowników mówiący ile razy przechodzimy zjednej historii do kolejnego zdarzenia transitions = {} @@ -484,15 +479,15 @@ totalNextSum = 0 for seq in self.allSequences: - if len(seq) >= historyLength+1: + if len(seq) >= historyLength + 1: # Początek sekwencji to start for i in range(historyLength, len(seq)): - keyName = "|".join([str(x) for x in seq[i-historyLength:i]]) + keyName = "|".join([str(x) for x in seq[i - historyLength:i]]) # Znajdujemy słownik z przejściami dla danej historii. val = transitions.get(keyName, {}) # Aktualizujemy słownik dodajć 1 - val[seq[i]] = val.get(seq[i], 0)+1 - totalNextSum +=1 + val[seq[i]] = val.get(seq[i], 0) + 1 + totalNextSum += 1 # Aktualizujemy słownik w głównym słowniku transitions[keyName] = val # Skoro już mamy wszystkie zdarzenia to wiemy ile razy która sekwencja będzie startowa @@ -508,30 +503,120 @@ # dana sekwencja key wystąpiła tyle razy co suma wartości liczników w wartościach sequences.append((key.split("|"), sum(value.values()))) - for k in range(targetLength-historyLength): + self.markovLoops = [] + + # Duża pętla + for k in range(targetLength - historyLength): newSequences = [] # Pogłębiamy sekwencje probabilistycznie for seq, number in sequences: - # + # keyName = "|".join([str(x) for x in seq[-historyLength:]]) # Znajdujemy słownik z przejściami dla danej historii. val = transitions.get(keyName, {}) total = sum(val.values()) for nextEl, numberNext in val.items(): - if (number*numberNext/total)/totalNextSum > shareLimit: - newSequences.append((seq+[str(nextEl)], number*numberNext/total)) - sequences = newSequences + if (number * numberNext / total) / totalNextSum > shareLimit: + newSequences.append((seq + [str(nextEl)], number * numberNext / total)) + sequences = newSequences.copy() + if debug: + print("sequences", len(sequences), sequences[0:10]) + if foldLoops: + newSequences = [] + newLoops = [] + for seq, number in sequences: + seqIsLoop = False + for foldWidth in range(1, foldMaxElements + 1): + # Zataczamy pętlę jeżeli mamy {Z}XYX, gdzie: + # Z - cokolwiek wcześniej + # X ma szerokość historyLength + # zaś Y ma szerokość określoną w foldWidth maksymalnie foldMaxElements + if len(seq) >= (2 * historyLength + foldWidth): + # print(seq, + # seq[-(2*historyLength+foldWidth):-(historyLength+foldWidth)], + # seq[-historyLength:], + # seq[-(2*historyLength+foldWidth):-(historyLength+foldWidth)] == seq[-historyLength:]) + if seq[-(2 * historyLength + foldWidth):-(historyLength + foldWidth)] == seq[-historyLength:]: + # Jest pętle, usuwamy sekwencję (nie dodajemy) + # Sekwencję przenosimy do listę pętli + # Pierwszy element to ZXY - potrzebny do zaraportowania + # Drugi to ZX - potrzebny do modyfikacji liczebności + newLoops.append((seq.copy(), seq[:-(historyLength + foldWidth)], number)) + seqIsLoop = True + # breakujemy fora bo jak jest lupem n to nei ma sensu szukać n+1 + break + if not seqIsLoop: + # Nie ma pętli zostawiamy sekwencję nieruszoną + newSequences.append((seq, number)) + + # W tym miejscu każda sekwencja z sequences trafia albo na listę newSequences albo newLoops + # Poniżej robimy "dodawanie", którego nei trzeba robić jeżeli nei ma pętli + if len(newLoops) == 0: + # Jeżeli to spełnione to ekwiwalent zakończ iteracji fora z Duża pętla + # Jeżeli nie znaleźlismy żadnej pętli to nie ma co mielić + # Przechodzimy do kolejnej iteracji + sequences = newSequences.copy() + continue + + if debug: + print("New loops", len(newLoops), newLoops[0:10]) + print("sequences", len(sequences), sequences[0:10]) + print("newSequences", len(newSequences), newSequences[0:10]) + + # Teraz musimy te liczby zabrane z pętli "zwrócić" tam gdzie się należą + # Zwroty musimy zrobić proporcjonalnie więc trzeba policzyć sumę subsekwencji + newLoopsCounts = [] + # Zapiszemy sobie te numery sekwencji które trzeba zmodyfikować, + # żeby nie lecieć niepotrzebnie kolejny raz po wszystkim + # docelowo może być tego sporo + seqId2mod = set() + + for loop, loopID, loopNumber in newLoops: + # subSeqLoopCounter to mianownik dla konkretnej pętli + # czyli liczba wszystkich sekwencji z pocątkiem ZX, ale nie ZXYX ani ZXQX + subSeqLoopCounter = 0 + # Znajdźmy wszystkie sekwencje ZX na które musimy rozdysponować loopNumber + for i, (seq, number) in enumerate(newSequences): + if loopID == seq[:len(loopID)]: + subSeqLoopCounter += number + # Do tej sekwencji musimy dodać liczebność + seqId2mod.add(i) + newLoopsCounts.append((loop, loopID, loopNumber, subSeqLoopCounter)) + + if debug: + print("newLoopsCounts", newLoopsCounts[0:10]) + + for i in seqId2mod: + seq, number = newSequences[i] + # Musimy zrobić licznik bo do jednej sekwencji możemy musieć dodać kilka razy jak były dwie pętle + toAdd = 0 + for loop, loopID, loopNumber, subSeqLoopCounter in newLoopsCounts: + if loopID == seq[:len(loopID)]: + # Dodajemy taką część loopNumber (udział) + # Jaki jest udział tej sekwencji seq we wszystkich które się zaczynały + # od seq[:len(loopID)] (ZX) czyli subSeqLoopCounter + # number - liczba wystąpień sekwencji DO której dodajemy + toAdd += loopNumber * (number / subSeqLoopCounter) + if debug: + print(seq, number, toAdd) + # Dodajemy sekwencję do naszej listy ze zmodyfikowaną liczebnością + newSequences[i] = (seq, number + toAdd) + + self.markovLoops.extend(newLoopsCounts.copy()) + + # Nadpisujemy liste sekwencji nad którą pracujemy w pętli. + sequences = newSequences # Normalizujemy i sortujemy - self.markov = sorted([(x, y/totalNextSum) for x,y in sequences], reverse=True, key=lambda x:x[1]) - self.markovMapped = sorted([([self.idsMapRev[int(z)] for z in x], y/totalNextSum) for x,y in sequences], reverse=True, key=lambda x:x[1]) - self.markov_not_norm = sorted([(x, y) for x,y in sequences], reverse=True, key=lambda x:x[1]) + self.markov = sorted([(x, y / totalNextSum) for x, y in sequences], reverse=True, key=lambda x: x[1]) + self.markovMapped = sorted([([self.idsMapRev[int(z)] for z in x], y / totalNextSum) for x, y in sequences], + reverse=True, key=lambda x: x[1]) + self.markov_not_norm = sorted([(x, y) for x, y in sequences], reverse=True, key=lambda x: x[1]) return self.markovMapped if __name__ == "__main__": - import sys import argparse parser = argparse.ArgumentParser('Flow Analyzer') @@ -539,6 +624,7 @@ parser.add_argument('-c', nargs='+') parser.add_argument('-lmin', type=int, default=3) parser.add_argument('-lmax', type=int, default=7) + parser.add_argument('--fold', action='store_true') args = parser.parse_args() cleanRules = [ @@ -559,7 +645,10 @@ analyzer.prep_sequences(dropDuplicates=True) for length in range(args.lmin, args.lmax + 1): + if args.fold: + seq = analyzer.analyze_markov_fold(targetLength=length, foldLoops=True) + else: + seq = analyzer.analyze_markov(targetLength=length) name = "seq_L{}.json".format(length) - seq = analyzer.analyze_markov(targetLength=length) with open(name, "w") as f: json.dump(seq, f)
--- a/stress-tester/src/main/resources/flow_analyzer_fold.py Thu Jun 25 09:44:15 2020 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,655 +0,0 @@ -import csv -import re - -import pandas as pd -import ujson as json - -from collections import Counter -from itertools import groupby -from pathlib import Path -from urllib import parse - - -class flowAnalyzer: - def get_files_list(self, pathString="", glob=False, globString="", disallowedSuffixex = [".gz", ".zip"], debug=False): - '''Funkcja, której celem jest przygotowanie listy plików z logami do wczytania - Może działać na dwa sposoby przez podanie po prostu ścieżki pliku z logami - lub przez znalezienie wszystkich plików przez glob - ''' - # Tutaj stworzona zostanie na nowo lista plików w obiekcie - self.fileList = [] - - if glob: - # Wykorzystujemy pathlib aby lepiej działać między systemowo - path = Path(pathString) - for filename in path.glob(globString): - # Upewniamy się, że plik nie ma zakazanego rozszerzenia - if filename.suffix not in disallowedSuffixex: - self.fileList.append(filename) - else: - self.fileList.append(pathString) - - if debug: - print(self.fileList) - - def extract_lines(self, disallowedStrings, debug = False): - ''' - Czytanie wszystkich plików linia po linii. - Dodajemy sobie tylko proste warunki sprawdzenia poprawności linii przez zawieranie się stringów - ''' - # nie robimy self.lines, żeby nie marnować pamięci przy większych plikach i nie musieć explicite kasować - lines = [] - skippedLines = 0 - for file in self.fileList: - with open(file) as fp: - for line in fp: - # Sprawdzamy czy ta linijka nie zawiera jakiś "niedozwolonych fragmentów" - # UWAGA, poniższy if na pewno nie jest królem wydajności - if any([x in line for x in disallowedStrings]): - skippedLines +=1 - continue - lines.append(line.rstrip('\n')) - if debug: - print("Liczba pominiętych linii:", skippedLines) - return lines - - def get_events_line_regex(self, rePatter, disallowedStrings = ["system overloaded"], - dateFormat = "%d/%b/%Y:%H:%M:%S", debug = False): - ''' - Funkcja, której celem jest parsowanie logów z założeniem, że - możemy przejść regexem grupowym każdą linijkę każdego pliku na naszej liści - - UWAGA: zastanawiałem się czy lepsze jest takie podejście czy może robienie regexów linia po linii. - Zostawiłem takie (gdzie najpierw zbieramy wszystkie linie, a potem robimy regexpy) dla czytelności. - Wszystko jedno jak to będzie w praktyce. - ''' - lines = self.extract_lines(disallowedStrings=disallowedStrings, debug=debug) - - pat = re.compile(rePatter) - rows = [] - for line in lines: - row = re.match(pat, line) - tdict = row.groupdict() - rows.append([tdict["id"], tdict["datetime"], tdict["requestMethod"], tdict["url"], tdict["protocol"], tdict["status"], tdict["agent"]]) - self.data = df = pd.DataFrame(rows, columns=["id", "datetime", "requestMethod", "url", "protocol", "status", "agent"]) - - # Linijka na potrzeby szybszych testów, do skasowania - # self.data = self.data.iloc[0:10000] - - # Zmiana tekstowego formatu daty na datę pandasową, żeby można bylo skutecznie sortować - self.data["datetime"] = pd.to_datetime(self.data.datetime, format=dateFormat) - if debug: - print("Wczytana liczba wierszy i kolumn: ", self.data.shape) - - def get_events_line_json(self, disallowedStrings = ["system overloaded"], - dateFormat = None, columnMap = {}, debug = False): - ''' - Funkcja, której celem jest parsowanie logów z założeniem, że na początku jest data, a dalej spacją rozdzielone datetime - - UWAGA: zastanawiałem się czy lepsze jest takie podejście czy może robienie regexów linia po linii. - Zostawiłem takie (gdzie najpierw zbieramy wszystkie linie, a potem robimy regexpy) dla czytelności. - Wszystko jedno jak to będzie w praktyce. - ''' - lines = self.extract_lines(disallowedStrings=disallowedStrings, debug=debug) - - rows = [] - for line in lines: - line = json.loads(line) - try: - line["httpreqheaders"] = line["httpreqheaders"][re.search("SESSION=", line["httpreqheaders"]).end():] - line["httpreqheaders"] = line["httpreqheaders"][:re.search("; ", line["httpreqheaders"]).start()] - except: - if debug: - print(line) - continue - rows.append([ - line[columnMap.get("id", "id")], - line[columnMap.get("datetime", "datetime")], - line[columnMap.get("requestMethod", "requestMethod")], - line[columnMap.get("url", "url")], - line[columnMap.get("protocol", "protocol")], - line[columnMap.get("status", "status")], - line[columnMap.get("agent", "agent")] - ]) - - self.data = df = pd.DataFrame(rows, columns=["id", "datetime", "requestMethod", "url", "protocol", "status", "agent"]) - - # Linijskan ap otrzeby szybszych testów do skasowania - # self.data = self.data.iloc[0:10000] - - # Zmiana tekstowego formatu daty na datę pandasową, żeby można bylo skutecznie sortować - self.data["datetime"] = pd.to_datetime(self.data.datetime, format=dateFormat) - if debug: - print("Wczytana liczba wierszy i kolumn: ", self.data.shape) - - def get_events_line_csv(self, disallowedStrings = ["system overloaded"], - dateFormat = None, columnMap = {}, debug = False): - lines = self.extract_lines(disallowedStrings=disallowedStrings, debug=debug) - - rows = [] - for row in csv.reader(lines, delimiter=',', quotechar='"'): - rows.append(row) - # print(row) - self.data = df = pd.DataFrame(rows, columns=["id", "datetime", "requestMethod", "url", "protocol", "status", "agent"]) - - # Zmiana tekstowego formatu daty na datę pandasową, żeby można bylo skutecznie sortować - self.data["datetime"] = pd.to_datetime(self.data.datetime, format=dateFormat) - if debug: - print("Wczytana liczba wierszy i kolumn: ", self.data.shape) - - def get_data_excel(self, filename="", columnMap = {}): - df = pd.read_excel(filename) - df = df.rename(columns=columnMap) - df = df[columnMap.values()] - df["datetime"] = pd.to_datetime(df.datetime) - self.data = df - - def prep_data(self, cleanRows = True, cleanRules = None, dropMissing = True, debug=False): - if cleanRules is None: - self.cleanRules = [ - # Zasoby doatkowe typu css/js - ".css", ".js", - # Elementy graficzne - ".jpg", ".png", ".svg", ".gif", ".ico", - # Czcionki - ".ttf", ".woff", ".woff2", ".eot", - # Dokumenty - ".pdf", ".doc", ".docx", ".ppt", ".pptx", ".txt", - # Inne elementy jak np strony sprawdzające captch - "captcha" - ] - else: - self.cleanRules = cleanRules - - - # Rozbijamy ścieżkę na abzowy url i argumenty przez znak "?" - - temp = self.data['url'].str.split('?', 1, expand=True) - - - # Konwersja na stringi - self.data["pathBase"] = temp[0].astype(str) - if temp.shape[1]<2: - self.data["pathArgs"] = "" - else: - self.data["pathArgs"] = temp[1].fillna("").astype(str) - - - # Proste czyszczenie i normalizowanie stringów (url, request method) - self.data["pathArgs"] = self.data["pathArgs"].str.lower().str.strip() - self.data["pathBase"] = self.data["pathBase"].str.lower().str.strip() - self.data["requestMethod"] = self.data["requestMethod"].str.lower().str.strip() - - if dropMissing: - shapeBefore = self.data.shape - self.data = self.data.dropna(subset=["id", "datetime", "pathBase"]) - if debug: - print("Rozmiar przed missingami:", shapeBefore) - print("Rozmiar po missingach:", self.data.shape) - - if cleanRows: - shapeBefore = self.data.shape - idsToRemove = [] - if debug: - print("Liczba wierszy spełniających regułę:") - for rule in self.cleanRules: - # UWAGA - # Poniższa linia nie jest na pewno bardzo wydajna, ale jest prosta i szybka - # Sposób czyszczenia to detal który jest do dopracowania / zmiany wedle uznania - temp = self.data.loc[self.data.pathBase.str.contains(rule, case=False, regex=False)].index.tolist() - if debug: - print(rule, len(temp)) - idsToRemove.extend(temp.copy()) - self.data = self.data.drop(idsToRemove) - - if debug: - print("Rozmiar przed czyszczeniem:", shapeBefore) - print("Rozmiar po czyszczeniu:", self.data.shape) - - def clean_base(self, maxPathDepth=False, whiteListParts = False): - if maxPathDepth: - splitted = [x.split("/")[0:maxPathDepth] for x in self.data["pathBase"]] - # Rozbijam na potrzeby czytelności i ewentualnie wydajności. - # Jeżeli jest max to najpierw go ograniczamy, jezlei robimy whitelist, ale bez max to tez musimy splitować - # Jeżeli jest tylko whitelist to nie znamy wartosci maxPathDepth - # Można tez przyjąć maxPathDept 1000 jako wartośc domyślną i mieć w jednej linii - elif whiteListParts: - splitted = [x.split("/") for x in self.data["pathBase"]] - - # Jeżeli white list to filtrujemy - if whiteListParts: - # splitted to lista list - # ulr to element splitted - # part to element url - splitted = [part for part in url if part in whiteListParts for url in splitted] - - # Musimy teraz "skleić" - if maxPathDepth or whiteListParts: - filteredBase = ["/".join(parts) for parts in splitted] - else: - filteredBase = self.data["pathBase"].tolist() - return filteredBase - - def clean_args(self, whiteListArgs = False, blacklistArgs=False): - - args = self.data["pathArgs"].map(parse.parse_qsl) - if whiteListArgs: - args1 = [] - for argList in args: - args1.append([(x,y) for x,y in argList if x in whiteListArgs]) - # Cały for i nadpisanie mogą być w jednej linii, ale wtedy kod jest mniej czytelny - # Możliwe też, że w implementacje w jave whitelist będzie to wczesniej już na etapie parsowania - - args = args1 - elif blacklistArgs: - args1 = [] - for argList in args: - args1.append([(x,y) for x,y in argList if x not in blacklistArgs]) - # Cały for i nadpisanie mogą być w jednej linii, ale wtedy kod jest mniej czytelny - # Możliwe też, że w implementacje w jave blacklist będzie to wczesniej już na etapie parsowania - - args = args1 - return args - - def explore(self, topN = 20, maxPathDepth=False, whiteListParts = False, whiteListArgs = False, blacklistArgs=False): - ''' - Zwraca statystyki dotyczące ścieżek, argumentów i wartości. - Pozwala zrozumieć co powinniśmy rozumieć przez unikatowy "krok" i z jakimi ustawieniami dokonać analizy - - maxPathDepth : mówi jak wiele elementów urla bierzemy pod uwagę (jaka maksymalna głebokość) - allowedParts : mówi jakie elementy urla są dozwolone. - - Filtrowanie argumentów pozwala na to aby nie brać pod uwage jakiś argumentów w analizie lub brać tylko wybrane - Jeżeli whiteListArgs jest zbiorem lub listą, to blacklistArgs nie jest brane pod uwagę - ''' - - filteredBase = self.clean_base(maxPathDepth, whiteListParts) - - countBase = sorted(list(Counter(filteredBase).items()), key=lambda x: x[1], reverse=True) - print("Top base url:") - for url in countBase[0:topN]: - print(url) - - args = self.clean_args(whiteListArgs, blacklistArgs) - - - urlArgs = [] - for url, argList in zip(filteredBase, args): - urlArgs.append(url+"|"+"_".join(sorted([x for x,y in argList]))) - - countUrlArg = sorted(list(Counter(urlArgs).items()), key=lambda x: x[1], reverse=True) - print("\n\nTop base url with args") - for value in countUrlArg[0:topN]: - print(value) - - - argPairs = sum(args, []) - argsOnly = [x for x,y in argPairs] - - countArgs = sorted(list(Counter(argsOnly).items()), key=lambda x: x[1], reverse=True) - print("\n\nTop args:") - for value in countArgs[0:topN]: - print(value) - - countPairs = sorted(list(Counter(argPairs).items()), key=lambda x: x[1], reverse=True) - print("\n\nTop pairs") - for value in countPairs[0:topN]: - print(value) - - def prep_sequences(self, useId=True, - useBase=True, maxPathDepth=False, whiteListParts = False, - useArgs = False, whiteListArgs = False, blacklistArgs=False, - useValues=False, - perUser = True, maxTimeDelta = 600, dropDuplicates = False, - groupOthers = False, othersLimit = 0.01, - debug = False): - ''' - Funkcja musi przygotować identyfikator "zdarzenia". - Każde "zdarzenie" może być identyfikowane przez dowolne kombinacje urli, czesci argumentów i wartości - Tutaj można mieć jeszcze więcej pomysłów, ale chyba na tym etapie wystarczy tyle. - - useId mówi czy mamy jakieś "id" które kontrolumy i czy chcemy go używać, czy moze lecieć po wszystkim chronologicznie. - - Bo można definiować jakieś white listy (na całe fragmenty nie tylko czesci) regexy na czesci itp. - Podobnie z argumentami. - W tej chwili możemy mieć: - * base - * base+args - * args - * args+values - * base+args+values - - Przy czym argsy i partsyBasa możemy kontrolować - - perUser: czy łaczyć wszystkie zdarzenia w jedną sekwencję po czasie czy działać indywidualnie epr user. - maxTimeDelta: sekundy maksymalny czas w ramach jednego "usera", - który może minąć aby kolejne zdrzenia pozostały w jednej sekwencji - dropDuplicates: czy jeżeli w sekwencja mamy ABBDCA to zamieniamy je na ABDCA przed rozpoczeciem analizy. - Duplikaty rozumiemy jako dwa zdarzenia tego samego usera w ramac htej samej sesji czasowej. - groupOthers: Pozwala na to aby wszystkie id rzadko wystepujące zamienić na "OTHER" - othersLimit: procentowa wartość (0-1) poniżej której zamieniamy na "OTHER" - - ''' - ids = [] - - if useBase: - filteredBase = self.clean_base(maxPathDepth, whiteListParts) - else: - filteredBase = ["url" for x in self.data["pathBase"]] - - - if useArgs: - args = self.clean_args(whiteListArgs, blacklistArgs) - - if useValues: - for url, argList in zip(filteredBase, args): - ids.append(url+"|"+"_".join(sorted([x+"+"+y for x,y in argList]))) - else: - for url, argList in zip(filteredBase, args): - ids.append(url+"|"+"_".join(sorted([x for x,y in argList]))) - - else: - ids = filteredBase - - if groupOthers: - icounts = dict(Counter(ids)) - icounts = {k:v/len(ids) for k,v in icounts.items()} - ids = [x if icounts[x] > othersLimit else "OTHER" for x in ids ] - - # Mapujemy "nazwy" sekwencji na integery, żeby nei trzymać w pamięci miliona niepotrzebnych stringów - uniqueIds = sorted(list(set(ids))) - self.idsMap = {idi:i for i, idi in enumerate(uniqueIds)} - self.idsMapRev = {i:idi for i, idi in enumerate(uniqueIds)} - - # Tworzymy w naszym zbiorze data kolumnę z id sekwencji - # Zbór data zawiera informacje o czasie i "id" więc sekwencje będziemy grupować później. - self.data["idsn"] = [self.idsMap[idi] for idi in ids] - - if perUser: - # Pogrupujmy wiersze po id - groupped = self.data.groupby("id") - else: - # Jeżeli nie grupujemy to i tak stworzymy strukturę danych analogiczną - # do efektu grupowania (lista krotek z nazwą i danymi) - groupped = [("all", self.data)] - - - - # Przygotujmy sobie liste wszystkich sekwencji z uwzględnieniem usera i maxTimeDelta - allSequences = [] - for name, df in groupped: - # Sortujemy po czasie w gramach grupy ID - df = df.sort_values("datetime", ascending=True) - - # Liczymy sobie deltę czasu pomiędzy zdarzeniami - df['datetimeDelta'] = (df['datetime']-df['datetime'].shift()).fillna(pd.Timedelta(seconds=0)).dt.seconds - - for i, (seqId, timeDelta) in enumerate(zip(df["idsn"], df["datetimeDelta"])): - # Poniżej nei biję rekordu wydajności, ale czytelności: - if i ==0 : - # Jeżeli to nowa gruap danego id to tworzymy nową listę - newSeq = [seqId] - elif timeDelta < maxTimeDelta: - # Jeżeli kolejny wiersz mieści się w czasie maxTimeDelta to dodajemy do dotychczasowej listy - newSeq.append(seqId) - else: - # W przeciwnym wypadku zamykamy poprzednią listę dodając do całego zbioru - allSequences.append(newSeq) - # I od nowa rozpoczynamy sekwencję - newSeq = [seqId] - # Na koniec dodajemy ostatni element który został - allSequences.append(newSeq) - - if dropDuplicates: - # usuwanei duplikatów można by zrobić wcześniej n apotrzeby zwiększenia wydajności - allSequences = [[k for k,g in groupby(seq)] for seq in allSequences] - - if debug: - print(allSequences[0:20]) - - self.allSequences = allSequences - - def analyze_markov(self, historyLength = 1, targetLength = 5, shareLimit = 0.0001): - ''' - Funkcja która wykona analizę sekwencji. Funkcja zakłada, że id sekwencji już zostały przygotowane: - historyLength: dla jak wielu elementów bierzemy historię by przewidywać następny element. - - ''' - - # transitions to słownik słowników mówiący ile razy przechodzimy zjednej historii do kolejnego zdarzenia - transitions = {} - # To ile będzie kluczy w pierwszym słowniku zalezy od głębokosci historii i liczby unikatowych kombinacji id - # To ile będzie kluczy w drugich słownikach zalezy od liczby unikatowych id - - totalNextSum = 0 - for seq in self.allSequences: - if len(seq) >= historyLength+1: - # Początek sekwencji to start - for i in range(historyLength, len(seq)): - keyName = "|".join([str(x) for x in seq[i-historyLength:i]]) - # Znajdujemy słownik z przejściami dla danej historii. - val = transitions.get(keyName, {}) - # Aktualizujemy słownik dodajć 1 - val[seq[i]] = val.get(seq[i], 0)+1 - totalNextSum +=1 - # Aktualizujemy słownik w głównym słowniku - transitions[keyName] = val - # Skoro już mamy wszystkie zdarzenia to wiemy ile razy która sekwencja będzie startowa - # w tradycyjnym podejściu markowa jesteśmy pewni gdzie jest początek sekwencji - # tutaj nie możemy być pewni więc zakładamy że może być w dowolnym miejscu - - # na początku będzie tyle "starterów" sekwencji ile mamy takich fragmentów w historii - sequences = [] - for key, value in transitions.items(): - # key to stringowe id sekwencji - # value to słownik z przejściami - # value.values() zawiera liczebności - # dana sekwencja key wystąpiła tyle razy co suma wartości liczników w wartościach - sequences.append((key.split("|"), sum(value.values()))) - - for k in range(targetLength-historyLength): - newSequences = [] - # Pogłębiamy sekwencje probabilistycznie - for seq, number in sequences: - # - keyName = "|".join([str(x) for x in seq[-historyLength:]]) - # Znajdujemy słownik z przejściami dla danej historii. - val = transitions.get(keyName, {}) - total = sum(val.values()) - - for nextEl, numberNext in val.items(): - if (number*numberNext/total)/totalNextSum > shareLimit: - newSequences.append((seq+[str(nextEl)], number*numberNext/total)) - sequences = newSequences - - # Normalizujemy i sortujemy - self.markov = sorted([(x, y/totalNextSum) for x,y in sequences], reverse=True, key=lambda x:x[1]) - self.markovMapped = sorted([([self.idsMapRev[int(z)] for z in x], y/totalNextSum) for x,y in sequences], reverse=True, key=lambda x:x[1]) - self.markov_not_norm = sorted([(x, y) for x,y in sequences], reverse=True, key=lambda x:x[1]) - return self.markovMapped - - - def analyze_markov2(self, historyLength = 1, targetLength = 5, shareLimit = 0.0001, - foldLoops=False, foldMaxElements=1, debug=False): - ''' - Funkcja która wykona analizę sekwencji. Funkcja zakłada, że id sekwencji już zostały przygotowane: - historyLength: dla jak wielu elementów bierzemy historię by przewidywać następny element. - - ''' - - # transitions to słownik słowników mówiący ile razy przechodzimy zjednej historii do kolejnego zdarzenia - transitions = {} - # To ile będzie kluczy w pierwszym słowniku zalezy od głębokosci historii i liczby unikatowych kombinacji id - # To ile będzie kluczy w drugich słownikach zalezy od liczby unikatowych id - - totalNextSum = 0 - for seq in self.allSequences: - if len(seq) >= historyLength+1: - # Początek sekwencji to start - for i in range(historyLength, len(seq)): - keyName = "|".join([str(x) for x in seq[i-historyLength:i]]) - # Znajdujemy słownik z przejściami dla danej historii. - val = transitions.get(keyName, {}) - # Aktualizujemy słownik dodajć 1 - val[seq[i]] = val.get(seq[i], 0)+1 - totalNextSum +=1 - # Aktualizujemy słownik w głównym słowniku - transitions[keyName] = val - # Skoro już mamy wszystkie zdarzenia to wiemy ile razy która sekwencja będzie startowa - # w tradycyjnym podejściu markowa jesteśmy pewni gdzie jest początek sekwencji - # tutaj nie możemy być pewni więc zakładamy że może być w dowolnym miejscu - - # na początku będzie tyle "starterów" sekwencji ile mamy takich fragmentów w historii - sequences = [] - for key, value in transitions.items(): - # key to stringowe id sekwencji - # value to słownik z przejściami - # value.values() zawiera liczebności - # dana sekwencja key wystąpiła tyle razy co suma wartości liczników w wartościach - sequences.append((key.split("|"), sum(value.values()))) - - self.markovLoops = [] - - # Duża pętla - for k in range(targetLength-historyLength): - newSequences = [] - # Pogłębiamy sekwencje probabilistycznie - for seq, number in sequences: - # - keyName = "|".join([str(x) for x in seq[-historyLength:]]) - # Znajdujemy słownik z przejściami dla danej historii. - val = transitions.get(keyName, {}) - total = sum(val.values()) - - for nextEl, numberNext in val.items(): - if (number*numberNext/total)/totalNextSum > shareLimit: - newSequences.append((seq+[str(nextEl)], number*numberNext/total)) - sequences = newSequences.copy() - if debug: - print("sequences", len(sequences), sequences[0:10]) - if foldLoops: - newSequences = [] - newLoops = [] - for seq, number in sequences: - seqIsLoop = False - for foldWidth in range(1, foldMaxElements+1): - # Zataczamy pętlę jeżeli mamy {Z}XYX, gdzie: - # Z - cokolwiek wcześniej - # X ma szerokość historyLength - # zaś Y ma szerokość określoną w foldWidth maksymalnie foldMaxElements - if len(seq)>=(2*historyLength+foldWidth): -# print(seq, -# seq[-(2*historyLength+foldWidth):-(historyLength+foldWidth)], -# seq[-historyLength:], -# seq[-(2*historyLength+foldWidth):-(historyLength+foldWidth)] == seq[-historyLength:]) - if seq[-(2*historyLength+foldWidth):-(historyLength+foldWidth)] == seq[-historyLength:]: - # Jest pętle, usuwamy sekwencję (nie dodajemy) - # Sekwencję przenosimy do listę pętli - # Pierwszy element to ZXY - potrzebny do zaraportowania - # Drugi to ZX - potrzebny do modyfikacji liczebności - newLoops.append((seq.copy(), seq[:-(historyLength+foldWidth)], number)) - seqIsLoop = True - # breakujemy fora bo jak jest lupem n to nei ma sensu szukać n+1 - break - if not seqIsLoop: - # Nie ma pętli zostawiamy sekwencję nieruszoną - newSequences.append((seq, number)) - - # W tym miejscu każda sekwencja z sequences trafia albo na listę newSequences albo newLoops - # Poniżej robimy "dodawanie", którego nei trzeba robić jeżeli nei ma pętli - if len(newLoops)==0: - # Jeżeli to spełnione to ekwiwalent zakończ iteracji fora z Duża pętla - # Jeżeli nie znaleźlismy żadnej pętli to nie ma co mielić - # Przechodzimy do kolejnej iteracji - sequences = newSequences.copy() - continue - - if debug: - print("New loops", len(newLoops), newLoops[0:10]) - print("sequences", len(sequences), sequences[0:10]) - print("newSequences", len(newSequences), newSequences[0:10]) - - # Teraz musimy te liczby zabrane z pętli "zwrócić" tam gdzie się należą - # Zwroty musimy zrobić proporcjonalnie więc trzeba policzyć sumę subsekwencji - newLoopsCounts = [] - # Zapiszemy sobie te numery sekwencji które trzeba zmodyfikować, - # żeby nie lecieć niepotrzebnie kolejny raz po wszystkim - # docelowo może być tego sporo - seqId2mod = set() - - for loop, loopID, loopNumber in newLoops: - # subSeqLoopCounter to mianownik dla konkretnej pętli - # czyli liczba wszystkich sekwencji z pocątkiem ZX, ale nie ZXYX ani ZXQX - subSeqLoopCounter = 0 - # Znajdźmy wszystkie sekwencje ZX na które musimy rozdysponować loopNumber - for i, (seq, number) in enumerate(newSequences): - if loopID == seq[:len(loopID)]: - subSeqLoopCounter+=number - # Do tej sekwencji musimy dodać liczebność - seqId2mod.add(i) - newLoopsCounts.append((loop, loopID, loopNumber, subSeqLoopCounter)) - - if debug: - print("newLoopsCounts", newLoopsCounts[0:10]) - - for i in seqId2mod: - seq, number = newSequences[i] - # Musimy zrobić licznik bo do jednej sekwencji możemy musieć dodać kilka razy jak były dwie pętle - toAdd = 0 - for loop, loopID, loopNumber, subSeqLoopCounter in newLoopsCounts: - if loopID == seq[:len(loopID)]: - # Dodajemy taką część loopNumber (udział) - # Jaki jest udział tej sekwencji seq we wszystkich które się zaczynały - # od seq[:len(loopID)] (ZX) czyli subSeqLoopCounter - # number - liczba wystąpień sekwencji DO której dodajemy - toAdd += loopNumber * (number/subSeqLoopCounter) - if debug: - print(seq, number, toAdd) - # Dodajemy sekwencję do naszej listy ze zmodyfikowaną liczebnością - newSequences[i] = (seq,number+toAdd) - - self.markovLoops.extend(newLoopsCounts.copy()) - - # Nadpisujemy liste sekwencji nad którą pracujemy w pętli. - sequences = newSequences - - # Normalizujemy i sortujemy - self.markov = sorted([(x, y/totalNextSum) for x,y in sequences], reverse=True, key=lambda x:x[1]) - self.markovMapped = sorted([([self.idsMapRev[int(z)] for z in x], y/totalNextSum) for x,y in sequences], reverse=True, key=lambda x:x[1]) - self.markov_not_norm = sorted([(x, y) for x,y in sequences], reverse=True, key=lambda x:x[1]) - return self.markovMapped - - -if __name__ == "__main__": - import sys - import argparse - - parser = argparse.ArgumentParser('Flow Analyzer') - parser.add_argument('input') - parser.add_argument('-c', nargs='+') - parser.add_argument('-lmin', type=int, default=3) - parser.add_argument('-lmax', type=int, default=7) - args = parser.parse_args() - - cleanRules = [ - ".css", ".js", ".jpg", ".png", ".svg", ".gif", ".ico", - ".ttf", ".woff", ".woff2", ".eot", - ".pdf", ".doc", ".docx", ".ppt", ".pptx", ".txt", - "captcha" - ] - - if args.c: - for cleanRule in args.c: - cleanRules.append(cleanRule) - - analyzer = flowAnalyzer() - analyzer.get_files_list(pathString=args.input) - analyzer.get_events_line_csv(dateFormat='%Y-%m-%d_%H_%M_%S_%f') #yyyy-MM-dd_HH_mm_ss_SSS000 - analyzer.prep_data(cleanRules=cleanRules) - analyzer.prep_sequences(dropDuplicates=True) - - for length in range(args.lmin, args.lmax + 1): - name = "seq_L{}.json".format(length) - seq = analyzer.analyze_markov2(targetLength=length, foldLoops=True) - with open(name, "w") as f: - json.dump(seq, f) \ No newline at end of file