axamon/hermes

View on GitHub
producilog/kafka.go

Summary

Maintainability
A
2 hrs
Test Coverage
package producilog

import (
    "bufio"
    "bytes"
    "context"
    "fmt"
    "log"
    "reflect"
    "strings"
    "sync"
    "time"

    "github.com/axamon/hermes/zipfile"
    "github.com/segmentio/kafka-go"
)

// KafkaLocalConsumer consuma i messaggi in kafka.
func KafkaLocalConsumer(ctx context.Context, topic string, oldoffset int64) (data []byte, offset int64, err error) {

    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    partition := 0

    conn, _ := kafka.DialLeader(ctx, "tcp", "localhost:9092", topic, partition)

    conn.SetReadDeadline(time.Now().Add(10 * time.Second))
    defer conn.Close()
    conn.Seek(oldoffset, 0)

    batch := conn.ReadBatch(10e3, 10e6) // fetch 10KB min, 1MB max
    defer batch.Close()

    b := make([]byte, 10e3) // 10KB max per message
    for {
        _, err := batch.Read(b)
        if err != nil {
            break
        }
        fmt.Println(string(b))
    }

    return b, batch.Offset(), err
}

var writers = make(map[string]*kafka.Writer)
var records = make(map[string][]string)
var canale = make(chan *string, 1000)
var nlog int
var wg sync.WaitGroup

// KafkaLocalProducer produce messaggi in kafka.
func KafkaLocalProducer(ctx context.Context, logfile string) (err error) {

    // Crea il contesto e la funzione di cancellazione.
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    start := time.Now()

    // Se non riesce a scrivere su Kafka procede senza andare in panico.
    defer func() {
        if r := recover(); r != nil {
            fmt.Println("Recovered in f", r)
        }
    }()

    done := make(chan bool, 1)

    // Apre il file zippato e salva il contenuto in memoria.
    content, err := zipfile.ReadAllGZ(ctx, logfile)
    if err != nil {
        log.Printf("ERROR Impossibile leggere file CDN %s, %s\n", logfile, err.Error())
        return err
    }

    // Trasforma il contenuto in *Reader
    r := bytes.NewReader(content)

    // Crea uno scanner
    scan := bufio.NewScanner(r)

    fmt.Println("Ciclo Scan iniziato")
    startScan := time.Now()
    for scan.Scan() {
        // line := new(string) // si usa new per creare line nella heap
        line := scan.Text()

        if strings.HasPrefix(line, "#") {
            continue
        }
        //    fmt.Println(*line)
        topic := strings.Split(line, ",")[0]
        records[topic] = append(records[topic], line)
        if len(records[topic]) >= 100 {
            // canale <- topic
            wg.Add(1)
            elabora(ctx, topic)
        }
        //fmt.Println("linea caricata su canale")
    }
    fmt.Println("Ciclo Scan finito", time.Since(startScan))

    // Chiude il canale.
    close(canale)

    // Comunica che i cicli scan sono finiti.
    done <- true

    // Ripulisce i resti
    keys := reflect.ValueOf(records).MapKeys()
    for _, key := range keys {
        wg.Add(1)
        elabora(ctx, key.String())
    }

    // Attende che tutte le elaborazioni siano finite.
    wg.Wait()

    // Mostra quanti records sono stati processati.
    fmt.Println(nlog)

    // Mostra quanto tempo รจ stato richiesto per terminare elaborazione.
    fmt.Println(time.Since(start))

    return
}

func elabora(ctx context.Context, topic string) {
    fmt.Println("Inizio Goroutine")
    defer wg.Done()

    fmt.Println("Creo Writer")
    // time.Sleep(2 * time.Microsecond)
    w := kafka.NewWriter(kafka.WriterConfig{Brokers: []string{"localhost:9092"}, Topic: topic})
    defer w.Close()

    for _, line := range records[topic] {

        strings.Split(line, ",")
        // time.Sleep(2 * time.Microsecond)
        fmt.Println("Produco Log 1")
        err := w.WriteMessages(ctx, kafka.Message{Value: []byte(line)})
        if err != nil {
            log.Printf("Error Impossibile produrre record in kafka\n")
        }
    }
    fmt.Println("Prodotto log su: ", topic)
    nlog++
    fmt.Println(nlog)
    return

}