axamon/hermes

View on GitHub
parsers/regman.go

Summary

Maintainability
A
1 hr
Test Coverage
// Copyright 2019 Alberto Bregliano. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package parsers

import (
    "bufio"
    "bytes"
    "compress/gzip"
    "context"
    "encoding/csv"
    "fmt"
    "log"
    "os"
    "runtime"
    "strings"
    "sync"
    "time"

    "github.com/axamon/hermes/idvideoteca"

    "github.com/axamon/hermes/hasher"
    "github.com/axamon/hermes/zipfile"
)

const headerregman = "giornoq;cpeid;tgu;trap_timestamp;deviceid;devicetype;mode;originipaddress;averagebitrate;avgsskbps;bufferingduration;callerclass;callerrorcode;callerrormessage;callerrortype;callurl;errordesc;errorreason;eventname;levelbitrates;linespeedkbps;maxsschunkkbps;maxsskbps;minsskbps;streamingtype;videoduration;videoposition;videotitle;videotype;videourl;eventtype;fwversion;networktype;ra_version;update_time;trap_provider;mid;service_id;service_id_version;date_rif;video_provider;max_upstream_net_latency;min_upstream_net_latency;avg_upstream_net_latency;max_downstream_net_latency;min_downstream_net_latency;avg_downstream_net_latency;max_platform_latency;min_platform_latency;avg_platform_latency;packet_loss;preloaded_app_v;IDNGASP;IDCDN"
const timeRegmanFormat = "2006-01-02 15:04:05"

// var isREGMAN = regexp.MustCompile(`(?m)^.*deviceid.*$`)

// NGASPLock gestisce l'accesso simultaneo alla scrittura sul file di output.
var NGASPLock sync.Mutex

var wgNGASP sync.WaitGroup

var writerchannel = make(chan *string, 1)

// REGMAN è il parser delle trap provenienti da REGMAN.
func REGMAN(ctx context.Context, logfile string, maxNumRoutines int) (err error) {

    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    // Utilizzerà il massimo dei processori disponibili meno uno.
    runtime.GOMAXPROCS(runtime.NumCPU() - 1)

    done := make(chan bool)

    // Apre nuovo file per salvare dati elaborati.
    newFile := strings.Split(logfile, ".csv.gz")[0] + ".offuscato.csv.gz"

    f, err := os.Create(newFile)
    if err != nil {
        return err
    }

    gw := gzip.NewWriter(f)
    defer gw.Close()

    csvWriter := csv.NewWriter(gw)
    csvWriter.Comma = ';'

    // Scrive headers.
    //gw.Write([]byte("#Log REGMAN prodotto da piattaforma Hermes Copyright 2019 alberto.bregliano@telecomitalia.it\n"))
    gw.Write([]byte(headerregman + "\n"))

    go func() {
        for {
            select {
            case row := <-writerchannel:
                gw.Write([]byte(*row))
            case <-done:
                return
            }
        }
    }()

    // Apri file zippato in memoria.
    content, err := zipfile.ReadAllGZ(ctx, logfile)
    if err != nil {
        log.Printf("Error impossibile leggere file NGASP %s, %s\n", logfile, err.Error())
        return err
    }

    r := bytes.NewReader(content)

    n := 0
    scan := bufio.NewScanner(r)
    for scan.Scan() {
        n++

        // Salta header.
        if n == 1 {
            continue
        }

        line := scan.Text()

        numRoutines := runtime.NumGoroutine()
        wgNGASP.Add(1)
        switch {
        case numRoutines > maxNumRoutines:
            ElaboraREGMAN(ctx, &line, gw)
        default:
            go ElaboraREGMAN(ctx, &line, gw)
        }
    }

    wgNGASP.Wait()
    done <- true
    // defer close(writerchannel)

    // Scrive footer.
    //gw.Write([]byte("#Numero di records: " + strconv.Itoa(n) + "\n"))
    gw.Flush()
    gw.Close()

    return err
}

// ElaboraREGMAN crea il file csv compresso con i campi sensibili offuscati.
func ElaboraREGMAN(ctx context.Context, line *string, gw *gzip.Writer) (err error) {

    ctx, cleanUP := context.WithCancel(ctx)
    defer cleanUP()

    defer wgNGASP.Done()

    // ricerca le fruzioni nell'intervallo temporale richiesto
    // l'intervallo temporale inzia con l'inzio di una fruizione

    // Splitta la linea nei suoi campi.
    // Il separatore per i log REGMAN è ";"
    s := strings.Split(*line, ";")

    // crea un idv vuoto
    var idv string

    // Se è un VOD Estrae id videoteca univoco del vod
    if strings.Contains(strings.ToLower(s[27]), "vod") {
        idv, _ = idvideoteca.Find(s[28])
    }

    clientip := s[6]

    // Crea IDNGASP come hash di ngasp.TGU + ngasp.CPEID + ngasp.IDVIDEOTECA non modificati
    rawIDNGASP := s[0] + s[1] + idv
    IDNGASP, err := hasher.StringSum(rawIDNGASP)

    rawIDCDN := clientip + idv
    IDCDN, err := hasher.StringSum(rawIDCDN)

    t, err := time.ParseInLocation(timeRegmanFormat, s[2], loc)
    if err != nil {
        log.Println(err.Error())
    }

    giornoq := giornoq(t)

    // recupera ip cliente

    // ! OFFUSCAMENTO CAMPI SENSIBILI

    // Effettue hash ip pubblico cliente.
    s[6], err = hasher.StringSumWithSalt(s[6], salt)

    // Effettue hash del cli cliente.
    s[1], err = hasher.StringSumWithSalt(s[1], salt)

    // Eliminazione campo titolo
    s[26] = "" // questo è il campo con il nome del film viene sostituito con idvideoteca
    s[36] = "" // nei games ci sono titoli che hanno apici
    s[33] = "" // a volte questo campo ha apici
    for n, l := range s {
        if strings.Contains(l, `'`) {
            fmt.Printf("Il record contiente caratteri non accettati: %d, %s\n", n, s)
        }
    }

    //Prepend field
    result := append([]string{giornoq}, s...)

    // Aggiunge IDNGASP alla fine
    result = append(result, IDNGASP, IDCDN)

    recordready := strings.Join(result, ";") + "\n"

    // Scrive dati.
    //err = csvWriter.Write(result)
    // NGASPLock.Lock()
    // gw.Write([]byte(recordready))
    // // gw.Flush()
    // NGASPLock.Unlock()
    writerchannel <- &recordready

    runtime.Gosched()
    return err
}