changeset 1186:8b605b57e68d

flow analyzer - in progress
author Devel 1
date Wed, 17 Jun 2020 13:33:53 +0200
parents a8cd127261a8
children 013fb325d040
files stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java stress-tester/src/main/java/com/passus/st/scanner/FlowAnalyzerCommand.java stress-tester/src/main/java/com/passus/st/scanner/HttpUrlSequencePayloadAnalyzer.java stress-tester/src/main/resources/flow_analyzer.py stress-tester/src/test/java/com/passus/st/scanner/FlowAnalyzerCommandTest.java
diffstat 5 files changed, 711 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java	Wed Jun 17 12:39:18 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/client/SessionPayloadEvent.java	Wed Jun 17 13:33:53 2020 +0200
@@ -6,6 +6,8 @@
 
 /**
  * @author Mirosław Hawrot
+ * @param <R>
+ * @param <S>
  */
 public class SessionPayloadEvent<R, S> extends SessionEvent {
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/java/com/passus/st/scanner/FlowAnalyzerCommand.java	Wed Jun 17 13:33:53 2020 +0200
@@ -0,0 +1,96 @@
+package com.passus.st.scanner;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class FlowAnalyzerCommand {
+
+    public static final boolean IS_WINDOWS = System.getProperty("os.name").contains("Windows");
+    public static final String DEFAULT_PYTHON_CMD = IS_WINDOWS ? "python" : "python3";
+    public static final String DEFAULT_SCRIPT_PATH = "flow_analyzer.py";
+    public static final String RESOURCE = "/flow_analyzer.py";
+
+    private String pythonCmd = DEFAULT_PYTHON_CMD;
+    private String scriptPath = DEFAULT_SCRIPT_PATH;
+    private String dataPath;
+
+    static void extractEmbeddedScript(File target) throws IOException {
+        target.delete();
+        extractResource(RESOURCE, target);
+    }
+
+    static void extractResource(String resource, File dest) throws IOException {
+        try (InputStream is = FlowAnalyzerCommand.class.getResourceAsStream(resource);) {
+            Path target = Paths.get(dest.toURI());
+            Files.copy(is, target);
+        }
+    }
+
+    void run() throws IOException, InterruptedException {
+        checkExists(scriptPath, "scriptPath");
+        checkExists(dataPath, "dataPath");
+        checkCmd(pythonCmd);
+
+        List<String> commandLine = Arrays.asList(pythonCmd, scriptPath, dataPath);
+        ProcessBuilder pb = new ProcessBuilder(commandLine);
+
+        boolean showOutput = true;
+        if (showOutput) {
+            pb.redirectErrorStream(true);
+            pb.inheritIO();
+        }
+
+        Process process = pb.start();
+        process.waitFor();
+    }
+
+    static void checkCmd(String cmd) throws IOException, InterruptedException {
+        try {
+            List<String> commandLine = Arrays.asList(cmd, "--version");
+            ProcessBuilder pb = new ProcessBuilder(commandLine);
+            Process process = pb.start();
+            process.waitFor();
+        } catch (Exception ex) {
+            throw new IllegalArgumentException("Cannot launch command");
+        }
+    }
+
+    static void checkExists(String path, String name) {
+        if (path == null) {
+            throw new IllegalArgumentException("Argument " + name + " must be not null.");
+        }
+        File file = new File(path);
+        if (!file.exists()) {
+            throw new IllegalArgumentException("File " + file.getAbsolutePath() + " does not exist.");
+        }
+    }
+
+    public static void main(String[] args) {
+        FlowAnalyzerCommand cmd = new FlowAnalyzerCommand();
+
+        int idx = 0;
+        switch (args.length) {
+            case 3:
+                cmd.pythonCmd = args[idx++];
+            case 2:
+                cmd.scriptPath = args[idx++];
+            case 1:
+                cmd.dataPath = args[idx++];
+                break;
+            default:
+                throw new IllegalArgumentException();
+        }
+
+    }
+
+}
--- a/stress-tester/src/main/java/com/passus/st/scanner/HttpUrlSequencePayloadAnalyzer.java	Wed Jun 17 12:39:18 2020 +0200
+++ b/stress-tester/src/main/java/com/passus/st/scanner/HttpUrlSequencePayloadAnalyzer.java	Wed Jun 17 13:33:53 2020 +0200
@@ -1,28 +1,53 @@
 package com.passus.st.scanner;
 
 import com.passus.commons.annotations.Plugin;
+import com.passus.config.Configurable;
+import com.passus.config.Configuration;
+import com.passus.config.ConfigurationContext;
+import com.passus.config.annotations.NodeDefinitionCreate;
+import com.passus.config.schema.NodeDefinition;
+import com.passus.config.schema.NodeDefinitionCreator;
+import com.passus.net.http.HttpHeaders;
 import com.passus.net.http.HttpRequest;
+import com.passus.net.http.HttpResponse;
 import com.passus.st.client.DataEvents;
 import com.passus.st.client.Event;
 import com.passus.st.client.SessionPayloadEvent;
 import com.passus.st.metric.MetricSource;
 import com.passus.st.metric.MetricsContainer;
 import com.passus.st.plugin.PluginConstants;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 
+import static com.passus.config.schema.ConfigurationSchemaBuilder.mapDef;
+import static com.passus.config.schema.ConfigurationSchemaBuilder.tupleDef;
 import static com.passus.st.Protocols.HTTP;
+import static com.passus.st.config.CommonNodeDefs.STRING_DEF;
 
 @Plugin(name = HttpUrlSequencePayloadAnalyzer.TYPE, category = PluginConstants.CATEGORY_SCANNER_ANALYZER)
-public class HttpUrlSequencePayloadAnalyzer implements ScannerAnalyzer, MetricSource {
+@NodeDefinitionCreate(HttpUrlSequencePayloadAnalyzer.NodeDefCreator.class)
+public class HttpUrlSequencePayloadAnalyzer implements ScannerAnalyzer, MetricSource, Configurable {
 
     public static final String TYPE = "httpUrlSequence";
 
     private HttpUrlSequences metric;
 
+    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH_mm_ss");
+
+    private UserIdExtractor userIdExtractor;
+    
     public String getType() {
         return TYPE;
     }
 
     @Override
+    public void configure(Configuration config, ConfigurationContext context) {
+        String userIdSource = config.getString("userIdSource");
+        userIdExtractor = resolveUserIdExtractor(userIdSource);
+        System.out.println("");
+    }
+
+    @Override
     public void activate() {
         metric = new HttpUrlSequences();
         metric.activate();
@@ -54,12 +79,12 @@
         if (event.getType() == SessionPayloadEvent.TYPE) {
             write((SessionPayloadEvent) event);
         } else if (event.getType() == DataEvents.DataEnd.TYPE) {
-
+            populateMetric();
         }
     }
 
     private void populateMetric() {
-
+        System.out.println("");
     }
 
     private void write(SessionPayloadEvent event) {
@@ -68,10 +93,47 @@
         }
 
         HttpRequest req = (HttpRequest) event.getRequest();
+        HttpResponse resp = (HttpResponse) event.getResponse();
         if (req != null) {
+            String uri = req.getUri().toString();
+            int qidx = uri.indexOf('?');
 
+            String path;
+            String query;
+            if (qidx < 0) {
+                path = uri;
+                query = null;
+            } else {
+                path = uri.substring(0, qidx);
+            }
+
+            String method = req.getMethod().toString();
+            String datetime = sdf.format(new Date(req.getTimestamp()));
+            String userId = userIdExtractor.extract(event);
+            String userAgent = req.getHeaders().get(HttpHeaders.USER_AGENT).toString(); // unused
+            String status = resp == null ? "" : resp.getStatus().toString(); // unused
+            String protocol = Integer.toString(req.getVersion()); // unused, eg. "HTTP/1.1"
+//pathBase pathArgs request_method datetime id agent status protocol
         }
     }
 
 
-}
\ No newline at end of file
+    public interface UserIdExtractor {
+        String extract(SessionPayloadEvent event);
+    }
+    
+    static UserIdExtractor resolveUserIdExtractor(String userIdSource) {
+        return (event) -> "id";
+    }
+    
+    public static class NodeDefCreator implements NodeDefinitionCreator {
+
+        @Override
+        public NodeDefinition create() {
+            return mapDef(
+                    tupleDef("userIdSource", STRING_DEF).setRequired(false)
+            );
+        }
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/main/resources/flow_analyzer.py	Wed Jun 17 13:33:53 2020 +0200
@@ -0,0 +1,514 @@
+import pandas as pd
+import re
+import gc
+from pathlib import Path
+
+from urllib import parse
+from collections import Counter
+
+from itertools import groupby
+
+import ujson as json
+
+
+class flowAnalyzer:
+    def get_files_list(self, pathString="", glob=False, globString="", disallowedSuffixex = [".gz", ".zip"], debug=False):
+        '''Funkcja, której celem jest przygotowanie listy plików z logami do wczytania
+        Może działać na dwa sposoby przez podanie po prostu ścieżki pliku z logami
+        lub przez znalezienie wszystkich plików przez glob
+        '''
+        # Tutaj stworzona zostanie na nowo lista plików w obiekcie
+        self.fileList = []
+        
+        if glob:
+            # Wykorzystujemy pathlib aby lepiej działać między systemowo
+            path = Path(pathString)
+            for filename in path.glob(globString):
+                # Upewniamy się, że plik nie ma zakazanego rozszerzenia
+                if filename.suffix not in disallowedSuffixex:
+                    self.fileList.append(filename)
+        else:
+            self.fileList.append(pathString)
+            
+        if debug:
+            print(self.fileList)
+    
+    def extract_lines(self, disallowedStrings, debug = False):
+        '''
+        Czytanie wszystkich plików linia po linii.
+        Dodajemy sobie tylko proste warunki sprawdzenia poprawności linii przez zawieranie się stringów
+        '''
+        # nie robimy self.lines, żeby nie marnować pamięci przy większych plikach i nie musieć explicite kasować
+        lines = []
+        skippedLines = 0
+        for file in self.fileList:
+            with open(file) as fp:
+                for line in fp:
+                    # Sprawdzamy czy ta linijka nie zawiera jakiś "niedozwolonych fragmentów"
+                    # UWAGA, poniższy if na pewno nie jest królem wydajności
+                    if any([x in line for x in disallowedStrings]):
+                        skippedLines +=1
+                        continue
+                    lines.append(line.rstrip('\n'))
+        if debug:
+            print("Liczba pominiętych linii:", skippedLines)
+        return lines
+    
+    def get_events_line_regex(self, rePatter, disallowedStrings = ["system overloaded"],
+                              dateFormat = "%d/%b/%Y:%H:%M:%S", debug = False):
+        '''
+        Funkcja, której celem jest parsowanie logów z założeniem, że
+        możemy przejść regexem grupowym każdą linijkę każdego pliku na naszej liści
+        
+        UWAGA: zastanawiałem się czy lepsze jest takie podejście czy może robienie regexów linia po linii.
+        Zostawiłem takie (gdzie najpierw zbieramy wszystkie linie, a potem robimy regexpy) dla czytelności.
+        Wszystko jedno jak to będzie w praktyce.
+        '''
+        lines = self.extract_lines(disallowedStrings=disallowedStrings, debug=debug)
+        
+        pat = re.compile(rePatter)
+        rows = []
+        for line in lines:
+            row = re.match(pat, line)
+            tdict = row.groupdict()
+            rows.append([tdict["id"], tdict["datetime"], tdict["requestMethod"], tdict["url"], tdict["protocol"], tdict["status"], tdict["agent"]])
+        self.data = df = pd.DataFrame(rows, columns=["id", "datetime", "requestMethod", "url", "protocol", "status", "agent"])
+        
+        # Linijskan ap otrzeby szybszych testów do skasowania
+        self.data = self.data.iloc[0:10000]
+        
+        # Zmiana tekstowego formatu daty na datę pandasową, żeby można bylo skutecznie sortować
+        self.data["datetime"] = pd.to_datetime(self.data.datetime, format=dateFormat)
+        if debug:
+            print("Wczytana liczba wierszy i kolumn: ", self.data.shape)
+    def get_events_line_json(self, disallowedStrings = ["system overloaded"],
+                              dateFormat = None, columnMap = {}, debug = False):
+        '''
+        Funkcja, której celem jest parsowanie logów z założeniem, że na początku jest data, a dalej spacją rozdzielone datetime
+        
+        UWAGA: zastanawiałem się czy lepsze jest takie podejście czy może robienie regexów linia po linii.
+        Zostawiłem takie (gdzie najpierw zbieramy wszystkie linie, a potem robimy regexpy) dla czytelności.
+        Wszystko jedno jak to będzie w praktyce.
+        '''
+        lines = self.extract_lines(disallowedStrings=disallowedStrings, debug=debug)
+        
+        rows = []
+        for line in lines:
+            line = json.loads(line)
+            try:
+                line["httpreqheaders"] = line["httpreqheaders"][re.search("SESSION=", line["httpreqheaders"]).end():]
+                line["httpreqheaders"] = line["httpreqheaders"][:re.search("; ", line["httpreqheaders"]).start()]
+            except:
+                if debug:
+                    print(line)
+                continue
+            rows.append([
+                    line[columnMap.get("id", "id")],
+                    line[columnMap.get("datetime", "datetime")],
+                    line[columnMap.get("requestMethod", "requestMethod")], 
+                    line[columnMap.get("url", "url")], 
+                    line[columnMap.get("protocol", "protocol")], 
+                    line[columnMap.get("status", "status")], 
+                    line[columnMap.get("agent", "agent")]
+                        ])
+
+        self.data = df = pd.DataFrame(rows, columns=["id", "datetime", "requestMethod", "url", "protocol", "status", "agent"])
+        
+        # Linijskan ap otrzeby szybszych testów do skasowania
+        # self.data = self.data.iloc[0:10000]
+        
+        # Zmiana tekstowego formatu daty na datę pandasową, żeby można bylo skutecznie sortować
+        self.data["datetime"] = pd.to_datetime(self.data.datetime, format=dateFormat)
+        if debug:
+            print("Wczytana liczba wierszy i kolumn: ", self.data.shape)
+    
+    def get_data_excel(self, filename="", columnMap = {}):
+        df = pd.read_excel(filename)
+        df = df.rename(columns=columnMap)
+        df = df[columnMap.values()]
+        df["datetime"] = pd.to_datetime(df.datetime)
+        self.data = df
+    
+    def prep_data(self, cleanRows = True, cleanRules = None, dropMissing = True, debug=False):
+        if cleanRules is None:
+            self.cleanRules = [
+                # Zasoby doatkowe typu css/js
+                ".css", ".js",
+                # Elementy graficzne
+                ".jpg", ".png", ".svg", ".gif", ".ico",
+                # Czcionki
+                ".ttf", ".woff", ".woff2", ".eot",
+                # Dokumenty
+                ".pdf", ".doc", ".docx", ".ppt", ".pptx", ".txt",
+                # Inne elementy jak np strony sprawdzające captch
+                "captcha"         
+             ]
+        else:
+            self.cleanRules = cleanRules
+            
+            
+        # Rozbijamy ścieżkę na abzowy url i argumenty przez znak "?"
+        
+        temp = self.data['url'].str.split('?', 1, expand=True)
+        
+
+        # Konwersja na stringi
+        self.data["pathBase"] = temp[0].astype(str)
+        if temp.shape[1]<2:
+            self.data["pathArgs"] = ""
+        else:
+            self.data["pathArgs"] = temp[1].fillna("").astype(str) 
+
+        
+        # Proste czyszczenie i normalizowanie stringów (url, request method)
+        self.data["pathArgs"] = self.data["pathArgs"].str.lower().str.strip()
+        self.data["pathBase"] = self.data["pathBase"].str.lower().str.strip()
+        self.data["requestMethod"] = self.data["requestMethod"].str.lower().str.strip()
+        
+        if dropMissing:
+            shapeBefore = self.data.shape
+            self.data = self.data.dropna(subset=["id", "datetime", "pathBase"])
+            if debug:
+                print("Rozmiar przed missingami:", shapeBefore)
+                print("Rozmiar po missingach:", self.data.shape)
+                
+        if cleanRows:
+            shapeBefore = self.data.shape
+            idsToRemove = []
+            if debug:
+                print("Liczba wierszy spełniających regułę:")
+            for rule in self.cleanRules:
+                # UWAGA
+                # Poniższa linia nie jest na pewno bardzo wydajna, ale jest prosta i szybka
+                # Sposób czyszczenia to detal który jest do dopracowania / zmiany wedle uznania
+                temp = self.data.loc[self.data.pathBase.str.contains(rule, case=False, regex=False)].index.tolist()
+                if debug:
+                    print(rule, len(temp))
+                idsToRemove.extend(temp.copy())
+            self.data = self.data.drop(idsToRemove)
+            
+            if debug:
+                print("Rozmiar przed czyszczeniem:", shapeBefore)
+                print("Rozmiar po czyszczeniu:", self.data.shape)
+        
+    def clean_base(self, maxPathDepth=False, whiteListParts = False):
+        if maxPathDepth:
+            splitted = [x.split("/")[0:maxPathDepth] for x in self.data["pathBase"]]   
+        # Rozbijam na potrzeby czytelności i ewentualnie wydajności.
+        # Jeżeli jest max to najpierw go ograniczamy, jezlei robimy whitelist, ale bez max to tez musimy splitować
+        # Jeżeli jest tylko whitelist to nie znamy wartosci maxPathDepth
+        # Można tez przyjąć maxPathDept 1000 jako wartośc domyślną i mieć w jednej linii
+        elif whiteListParts:
+            splitted = [x.split("/") for x in self.data["pathBase"]]
+        
+        # Jeżeli white list to filtrujemy
+        if whiteListParts:
+            # splitted to lista list
+            # ulr to element splitted
+            # part to element url
+            splitted = [part for part in url if part in whiteListParts for url in splitted]
+
+        # Musimy teraz "skleić"
+        if maxPathDepth or whiteListParts:
+            filteredBase = ["/".join(parts) for parts in splitted]
+        else:
+            filteredBase = self.data["pathBase"].tolist()
+        return filteredBase
+    
+    def clean_args(self, whiteListArgs = False, blacklistArgs=False):
+        
+        args = self.data["pathArgs"].map(parse.parse_qsl)
+        if whiteListArgs:
+            args1 = []
+            for argList in args:
+                args1.append([(x,y) for x,y in argList if x in whiteListArgs])
+            # Cały for i nadpisanie mogą być w jednej linii, ale wtedy kod jest mniej czytelny
+            # Możliwe też, że w implementacje w jave whitelist będzie to wczesniej już na etapie parsowania
+            
+            args = args1
+        elif blacklistArgs:
+            args1 = []
+            for argList in args:
+                args1.append([(x,y) for x,y in argList if x not in blacklistArgs])
+            # Cały for i nadpisanie mogą być w jednej linii, ale wtedy kod jest mniej czytelny
+            # Możliwe też, że w implementacje w jave blacklist będzie to wczesniej już na etapie parsowania
+            
+            args = args1
+        return args
+    
+    def explore(self, topN = 20, maxPathDepth=False, whiteListParts = False, whiteListArgs = False, blacklistArgs=False):
+        '''
+        Zwraca statystyki dotyczące ścieżek, argumentów i wartości.
+        Pozwala zrozumieć co powinniśmy rozumieć przez unikatowy "krok" i z jakimi ustawieniami dokonać analizy
+        
+        maxPathDepth : mówi jak wiele elementów urla bierzemy pod uwagę (jaka maksymalna głebokość)
+        allowedParts : mówi jakie elementy urla są dozwolone.
+        
+        Filtrowanie argumentów pozwala na to aby nie brać pod uwage jakiś argumentów w analizie lub brać tylko wybrane
+        Jeżeli whiteListArgs jest zbiorem lub listą, to blacklistArgs nie jest brane pod uwagę
+        '''
+
+        filteredBase = self.clean_base(maxPathDepth, whiteListParts)
+        
+        countBase = sorted(list(Counter(filteredBase).items()), key=lambda x: x[1], reverse=True)
+        print("Top base url:")
+        for url in countBase[0:topN]:
+            print(url)
+            
+        args = self.clean_args(whiteListArgs, blacklistArgs)
+        
+        
+        urlArgs = []
+        for url, argList in zip(filteredBase, args):
+            urlArgs.append(url+"|"+"_".join(sorted([x for x,y in argList])))
+
+        countUrlArg = sorted(list(Counter(urlArgs).items()), key=lambda x: x[1], reverse=True)
+        print("\n\nTop base url with args")
+        for value in countUrlArg[0:topN]:
+            print(value)
+            
+            
+        argPairs = sum(args, [])
+        argsOnly = [x for x,y in argPairs]
+        
+        countArgs = sorted(list(Counter(argsOnly).items()), key=lambda x: x[1], reverse=True)
+        print("\n\nTop args:")
+        for value in countArgs[0:topN]:
+            print(value)
+        
+        countPairs = sorted(list(Counter(argPairs).items()), key=lambda x: x[1], reverse=True)
+        print("\n\nTop pairs")
+        for value in countPairs[0:topN]:
+            print(value)
+
+    def prep_sequences(self, useId=True,
+                       useBase=True, maxPathDepth=False, whiteListParts = False, 
+                       useArgs = False, whiteListArgs = False, blacklistArgs=False,
+                       useValues=False, 
+                      perUser = True, maxTimeDelta = 600, dropDuplicates = False,
+                       groupOthers = False, othersLimit = 0.01,
+                      debug = False):
+        '''
+        Funkcja musi przygotować identyfikator "zdarzenia".
+        Każde "zdarzenie" może być identyfikowane przez dowolne kombinacje urli, czesci argumentów i wartości
+        Tutaj można mieć jeszcze więcej pomysłów, ale chyba na tym etapie wystarczy tyle.
+        
+        useId mówi czy mamy jakieś "id" które kontrolumy i czy chcemy go używać, czy moze lecieć po wszystkim chronologicznie.
+        
+        Bo można definiować jakieś white listy (na całe fragmenty nie tylko czesci) regexy na czesci itp.
+        Podobnie z argumentami.
+        W tej chwili możemy mieć:
+        * base
+        * base+args
+        * args 
+        * args+values
+        * base+args+values
+        
+        Przy czym  argsy i partsyBasa możemy kontrolować
+        
+        perUser: czy łaczyć wszystkie zdarzenia w jedną sekwencję po czasie czy działać indywidualnie epr user.
+        maxTimeDelta: sekundy maksymalny czas w ramach jednego "usera",
+                    który może minąć aby kolejne zdrzenia pozostały w jednej sekwencji
+        dropDuplicates: czy jeżeli w sekwencja mamy ABBDCA to zamieniamy je na ABDCA przed rozpoczeciem analizy. 
+                    Duplikaty rozumiemy jako dwa zdarzenia tego samego usera w ramac htej samej sesji czasowej.
+        groupOthers: Pozwala na to aby wszystkie id rzadko wystepujące zamienić na "OTHER"
+        othersLimit: procentowa wartość (0-1) poniżej której zamieniamy na "OTHER"
+                    
+        '''
+        ids = []
+        
+        if useBase:
+            filteredBase = self.clean_base(maxPathDepth, whiteListParts)
+        else:
+            filteredBase = ["url" for x in self.data["pathBase"]]
+        
+        
+        if useArgs:
+            args = self.clean_args(whiteListArgs, blacklistArgs)
+
+            if useValues:
+                for url, argList in zip(filteredBase, args):
+                    ids.append(url+"|"+"_".join(sorted([x+"+"+y for x,y in argList])))
+            else:
+                for url, argList in zip(filteredBase, args):
+                    ids.append(url+"|"+"_".join(sorted([x for x,y in argList])))
+        
+        else:
+            ids = filteredBase
+                
+        if groupOthers:
+            icounts = dict(Counter(ids))
+            icounts = {k:v/len(ids) for k,v in icounts.items()}
+            ids = [x if icounts[x] > othersLimit else "OTHER" for x in ids ]
+                
+        # Mapujemy "nazwy" sekwencji na integery, żeby nei trzymać w pamięci miliona niepotrzebnych stringów
+        uniqueIds = sorted(list(set(ids)))
+        self.idsMap = {idi:i for i, idi in enumerate(uniqueIds)}
+        self.idsMapRev = {i:idi for i, idi in enumerate(uniqueIds)}
+        
+        # Tworzymy w naszym zbiorze data kolumnę z id sekwencji
+        # Zbór data zawiera informacje o czasie i "id" więc sekwencje będziemy grupować później.
+        self.data["idsn"] = [self.idsMap[idi] for idi in ids]
+        
+        if perUser:
+            # Pogrupujmy wiersze po id
+            groupped = self.data.groupby("id")
+        else:
+            # Jeżeli nie grupujemy to i tak stworzymy strukturę danych analogiczną
+            # do efektu grupowania (lista krotek z nazwą i danymi)
+            groupped = [("all", self.data)]
+
+            
+            
+        #  Przygotujmy sobie liste wszystkich sekwencji z uwzględnieniem usera i maxTimeDelta
+        allSequences = []
+        for name, df in groupped:
+            # Sortujemy po czasie w gramach grupy ID
+            df = df.sort_values("datetime", ascending=True)
+            
+            # Liczymy sobie deltę czasu pomiędzy zdarzeniami
+            df['datetimeDelta'] = (df['datetime']-df['datetime'].shift()).fillna(pd.Timedelta(seconds=0)).dt.seconds
+            
+            for i, (seqId, timeDelta) in enumerate(zip(df["idsn"], df["datetimeDelta"])):
+                # Poniżej nei biję rekordu wydajności, ale czytelności:
+                if i ==0 :
+                    # Jeżeli to nowa gruap danego id to tworzymy nową listę
+                    newSeq = [seqId]
+                elif timeDelta < maxTimeDelta:
+                    # Jeżeli kolejny wiersz mieści się w czasie maxTimeDelta to dodajemy do dotychczasowej listy
+                    newSeq.append(seqId)
+                else:
+                    # W przeciwnym wypadku zamykamy poprzednią listę dodając do całego zbioru
+                    allSequences.append(newSeq)
+                    # I od nowa rozpoczynamy sekwencję
+                    newSeq = [seqId]
+            # Na koniec dodajemy ostatni element który został
+            allSequences.append(newSeq)
+
+        if dropDuplicates:
+        # usuwanei duplikatów można by zrobić wcześniej n apotrzeby zwiększenia wydajności
+            allSequences = [[k for k,g in groupby(seq)] for seq in allSequences]
+        
+        if debug:
+            print(allSequences[0:20])
+                    
+        self.allSequences = allSequences
+        
+    def analyze_markov(self, historyLength = 1, targetLength = 5, shareLimit = 0.0001):
+        '''
+        Funkcja która wykona analizę sekwencji. Funkcja zakłada, że id sekwencji już zostały przygotowane:
+        historyLength: dla jak wielu elementów bierzemy historię by przewidywać następny element.
+                    
+        '''
+        
+        # transitions to słownik słowników mówiący ile razy przechodzimy zjednej historii do kolejnego zdarzenia
+        transitions = {}
+        # To ile będzie kluczy w pierwszym słowniku zalezy od głębokosci historii i liczby unikatowych kombinacji id
+        # To ile będzie kluczy w drugich słownikach zalezy od liczby unikatowych id
+
+        totalNextSum = 0
+        for seq in self.allSequences:
+            if len(seq) >= historyLength+1:
+                # Początek sekwencji to start
+                for i in range(historyLength, len(seq)):
+                    keyName = "|".join([str(x) for x in seq[i-historyLength:i]])
+                    # Znajdujemy słownik z przejściami dla danej historii.
+                    val = transitions.get(keyName, {})
+                    # Aktualizujemy słownik dodajć 1
+                    val[seq[i]] = val.get(seq[i], 0)+1
+                    totalNextSum +=1
+                    # Aktualizujemy słownik w głównym słowniku
+                    transitions[keyName] = val
+        # Skoro już mamy wszystkie zdarzenia to wiemy ile razy która sekwencja będzie startowa
+        # w tradycyjnym podejściu markowa jesteśmy pewni gdzie jest początek sekwencji
+        # tutaj nie możemy być pewni więc zakładamy że może być w dowolnym miejscu
+        
+        # na początku będzie tyle "starterów" sekwencji ile mamy takich fragmentów w historii
+        sequences = []
+        for key, value in transitions.items():
+            # key to stringowe id sekwencji
+            # value to słownik z przejściami
+            # value.values() zawiera liczebności
+            # dana sekwencja key wystąpiła tyle razy co suma wartości liczników w wartościach
+            sequences.append((key.split("|"), sum(value.values())))
+            
+        for k in range(targetLength-historyLength):
+            newSequences = []
+            # Pogłębiamy sekwencje probabilistycznie
+            for seq, number in sequences:
+                # 
+                keyName = "|".join([str(x) for x in seq[-historyLength:]])
+                # Znajdujemy słownik z przejściami dla danej historii.
+                val = transitions.get(keyName, {})
+                total = sum(val.values())
+                
+                for nextEl, numberNext in val.items():
+                    if (number*numberNext/total)/totalNextSum > shareLimit:
+                        newSequences.append((seq+[str(nextEl)], number*numberNext/total))
+            sequences = newSequences
+            
+        # Normalizujemy i sortujemy
+        self.markov = sorted([(x, y/totalNextSum) for x,y in sequences], reverse=True, key=lambda x:x[1])
+        self.markovMapped = sorted([([self.idsMapRev[int(z)] for z in x], y/totalNextSum) for x,y in sequences], reverse=True, key=lambda x:x[1])
+        self.markov_not_norm = sorted([(x, y) for x,y in sequences], reverse=True, key=lambda x:x[1])
+        return self.markovMapped
+    
+    
+    def analyze_markov2(self, historyLength = 1, targetLength = 5, shareLimit = 0.0001, foldSequences=False, foldDepth=2):
+        '''
+        Funkcja która wykona analizę sekwencji. Funkcja zakłada, że id sekwencji już zostały przygotowane:
+        historyLength: dla jak wielu elementów bierzemy historię by przewidywać następny element.
+                    
+        '''
+        
+        # transitions to słownik słowników mówiący ile razy przechodzimy zjednej historii do kolejnego zdarzenia
+        transitions = {}
+        # To ile będzie kluczy w pierwszym słowniku zalezy od głębokosci historii i liczby unikatowych kombinacji id
+        # To ile będzie kluczy w drugich słownikach zalezy od liczby unikatowych id
+
+        totalNextSum = 0
+        for seq in self.allSequences:
+            if len(seq) >= historyLength+1:
+                # Początek sekwencji to start
+                for i in range(historyLength, len(seq)):
+                    keyName = "|".join([str(x) for x in seq[i-historyLength:i]])
+                    # Znajdujemy słownik z przejściami dla danej historii.
+                    val = transitions.get(keyName, {})
+                    # Aktualizujemy słownik dodajć 1
+                    val[seq[i]] = val.get(seq[i], 0)+1
+                    totalNextSum +=1
+                    # Aktualizujemy słownik w głównym słowniku
+                    transitions[keyName] = val
+        # Skoro już mamy wszystkie zdarzenia to wiemy ile razy która sekwencja będzie startowa
+        # w tradycyjnym podejściu markowa jesteśmy pewni gdzie jest początek sekwencji
+        # tutaj nie możemy być pewni więc zakładamy że może być w dowolnym miejscu
+        
+        # na początku będzie tyle "starterów" sekwencji ile mamy takich fragmentów w historii
+        sequences = []
+        for key, value in transitions.items():
+            # key to stringowe id sekwencji
+            # value to słownik z przejściami
+            # value.values() zawiera liczebności
+            # dana sekwencja key wystąpiła tyle razy co suma wartości liczników w wartościach
+            sequences.append((key.split("|"), sum(value.values())))
+            
+        for k in range(targetLength-historyLength):
+            newSequences = []
+            # Pogłębiamy sekwencje probabilistycznie
+            for seq, number in sequences:
+                # 
+                keyName = "|".join([str(x) for x in seq[-historyLength:]])
+                # Znajdujemy słownik z przejściami dla danej historii.
+                val = transitions.get(keyName, {})
+                total = sum(val.values())
+                
+                for nextEl, numberNext in val.items():
+                    if (number*numberNext/total)/totalNextSum > shareLimit:
+                        newSequences.append((seq+[str(nextEl)], number*numberNext/total))
+            sequences = newSequences
+            
+        # Normalizujemy i sortujemy
+        self.markov = sorted([(x, y/totalNextSum) for x,y in sequences], reverse=True, key=lambda x:x[1])
+        self.markovMapped = sorted([([self.idsMapRev[int(z)] for z in x], y/totalNextSum) for x,y in sequences], reverse=True, key=lambda x:x[1])
+        self.markov_not_norm = sorted([(x, y) for x,y in sequences], reverse=True, key=lambda x:x[1])
+        return self.markovMapped
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/stress-tester/src/test/java/com/passus/st/scanner/FlowAnalyzerCommandTest.java	Wed Jun 17 13:33:53 2020 +0200
@@ -0,0 +1,33 @@
+package com.passus.st.scanner;
+
+import com.passus.st.utils.TestResourceUtils;
+import java.io.File;
+import static org.testng.AssertJUnit.*;
+import org.testng.annotations.Test;
+
+/**
+ *
+ * @author mikolaj.podbielski
+ */
+public class FlowAnalyzerCommandTest {
+    
+    @Test
+    public void testExtractEmbeddedScript() throws Exception {
+        File dir = TestResourceUtils.createTmpDir();
+        File target = new File(dir, "script.py");
+        target.deleteOnExit();
+        System.out.println("Extracting to " + target.getAbsolutePath());
+        FlowAnalyzerCommand.extractEmbeddedScript(target);
+        assertEquals(25_024, target.length());
+    }
+
+    public static void main(String[] args) throws Exception {
+        FlowAnalyzerCommand.checkCmd("python");
+        try {
+            FlowAnalyzerCommand.checkCmd("no-such-python");
+            fail("Should fail on non-existing command");
+        } catch (IllegalArgumentException ignore) {
+            
+        }
+    }
+}