status-im/status-go

View on GitHub
discovery/muxer.go

Summary

Maintainability
A
0 mins
Test Coverage
A
97%
package discovery

import (
    "fmt"
    "strings"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/p2p/discv5"
)

// NewMultiplexer creates Multiplexer instance.
func NewMultiplexer(discoveries []Discovery) Multiplexer {
    return Multiplexer{discoveries}
}

// Multiplexer allows to use multiple discoveries behind single Discovery interface.
type Multiplexer struct {
    discoveries []Discovery
}

// Running should return true if at least one discovery is running
func (m Multiplexer) Running() (rst bool) {
    for i := range m.discoveries {
        rst = rst || m.discoveries[i].Running()
    }
    return rst
}

// Start every discovery and stop every started in case if at least one fails.
func (m Multiplexer) Start() (err error) {
    started := []int{}
    for i := range m.discoveries {
        if err = m.discoveries[i].Start(); err != nil {
            break
        }
        started = append(started, i)
    }
    if err != nil {
        for _, i := range started {
            _ = m.discoveries[i].Stop()
        }
    }
    return err
}

// Stop every discovery.
func (m Multiplexer) Stop() (err error) {
    messages := []string{}
    for i := range m.discoveries {
        if err = m.discoveries[i].Stop(); err != nil {
            messages = append(messages, err.Error())
        }
    }
    if len(messages) != 0 {
        return fmt.Errorf("failed to stop discoveries: %s", strings.Join(messages, "; "))
    }
    return nil
}

// Register passed topic and stop channel to every discovery and waits till it will return.
func (m Multiplexer) Register(topic string, stop chan struct{}) error {
    errors := make(chan error, len(m.discoveries))
    for i := range m.discoveries {
        i := i
        go func() {
            errors <- m.discoveries[i].Register(topic, stop)
        }()
    }
    total := 0
    messages := []string{}
    for err := range errors {
        total++
        if err != nil {
            messages = append(messages, err.Error())
        }
        if total == len(m.discoveries) {
            break
        }
    }
    if len(messages) != 0 {
        return fmt.Errorf("failed to register %s: %s", topic, strings.Join(messages, "; "))
    }
    return nil
}

// Discover shares topic and channles for receiving results. And multiplexer periods that are sent to period channel.
func (m Multiplexer) Discover(topic string, period <-chan time.Duration, found chan<- *discv5.Node, lookup chan<- bool) error {
    var (
        periods  = make([]chan time.Duration, len(m.discoveries))
        messages = []string{}
        wg       sync.WaitGroup
        mu       sync.Mutex
    )
    wg.Add(len(m.discoveries) + 1)
    for i := range m.discoveries {
        i := i
        periods[i] = make(chan time.Duration, 2)
        go func() {
            err := m.discoveries[i].Discover(topic, periods[i], found, lookup)
            if err != nil {
                mu.Lock()
                messages = append(messages, err.Error())
                mu.Unlock()
            }
            wg.Done()
        }()
    }
    go func() {
        for {
            newPeriod, ok := <-period
            for i := range periods {
                if !ok {
                    close(periods[i])
                } else {
                    periods[i] <- newPeriod
                }
            }
            if !ok {
                wg.Done()
                return
            }
        }
    }()
    wg.Wait()
    if len(messages) != 0 {
        return fmt.Errorf("failed to discover topic %s: %s", topic, strings.Join(messages, "; "))
    }
    return nil
}