waku-org/go-waku

View on GitHub
examples/chat2/chat.go

Summary

Maintainability
F
4 days
Test Coverage
package main

import (
    "chat2/pb"
    "context"
    "encoding/hex"
    "errors"
    "fmt"
    "strings"
    "sync"
    "time"

    "github.com/libp2p/go-libp2p/core/peer"
    "github.com/multiformats/go-multiaddr"
    "github.com/waku-org/go-waku/waku/v2/dnsdisc"
    "github.com/waku-org/go-waku/waku/v2/node"
    "github.com/waku-org/go-waku/waku/v2/payload"
    "github.com/waku-org/go-waku/waku/v2/protocol"
    "github.com/waku-org/go-waku/waku/v2/protocol/filter"
    "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
    "github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
    wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
    "github.com/waku-org/go-waku/waku/v2/protocol/relay"
    wrln "github.com/waku-org/go-waku/waku/v2/protocol/rln"
    "github.com/waku-org/go-waku/waku/v2/utils"
    "google.golang.org/protobuf/proto"
)

// Chat represents a subscription to a single PubSub topic. Messages
// can be published to the topic with Chat.Publish, and received
// messages are pushed to the Messages channel.
type Chat struct {
    ctx       context.Context
    wg        sync.WaitGroup
    node      *node.WakuNode
    ui        UI
    uiReady   chan struct{}
    inputChan chan string
    options   Options

    C chan *protocol.Envelope

    nick string
}

func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.PeerConnection, options Options) *Chat {
    chat := &Chat{
        ctx:       ctx,
        node:      node,
        options:   options,
        nick:      options.Nickname,
        uiReady:   make(chan struct{}, 1),
        inputChan: make(chan string, 100),
    }

    chat.ui = NewUIModel(chat.uiReady, chat.inputChan)

    topics := options.Relay.Topics.Value()
    if len(topics) == 0 {
        topics = append(topics, relay.DefaultWakuTopic)
    }

    if options.Filter.Enable {
        cf := protocol.ContentFilter{
            PubsubTopic:   relay.DefaultWakuTopic,
            ContentTopics: protocol.NewContentTopicSet(options.ContentTopic),
        }
        var filterOpt filter.FilterSubscribeOption
        peerID, err := options.Filter.NodePeerID()
        if err != nil {
            filterOpt = filter.WithAutomaticPeerSelection()
        } else {
            filterOpt = filter.WithPeer(peerID)
            chat.ui.InfoMessage(fmt.Sprintf("Subscribing to filter node %s", peerID))
        }
        theFilters, err := node.FilterLightnode().Subscribe(ctx, cf, filterOpt)
        if err != nil {
            chat.ui.ErrorMessage(err)
        } else {
            chat.C = theFilters[0].C //Picking first subscription since there is only 1 contentTopic specified.
        }
    } else {

        for _, topic := range topics {
            sub, err := node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
            if err != nil {
                chat.ui.ErrorMessage(err)
            } else {
                chat.C = make(chan *protocol.Envelope)
                go func() {
                    for e := range sub[0].Ch {
                        chat.C <- e
                    }
                }()
            }
        }
    }

    chat.wg.Add(7)
    go chat.parseInput()
    go chat.receiveMessages()

    connectionWg := sync.WaitGroup{}
    connectionWg.Add(2)

    go chat.welcomeMessage()

    go chat.connectionWatcher(&connectionWg, connNotifier)
    go chat.staticNodes(&connectionWg)
    go chat.discoverNodes(&connectionWg)
    go chat.retrieveHistory(&connectionWg)

    return chat
}

func (c *Chat) Stop() {
    c.wg.Wait()
    close(c.inputChan)
}

func (c *Chat) connectionWatcher(connectionWg *sync.WaitGroup, connNotifier <-chan node.PeerConnection) {
    defer c.wg.Done()

    for conn := range connNotifier {
        if conn.Connected {
            c.ui.InfoMessage(fmt.Sprintf("Peer %s connected", conn.PeerID.String()))
        } else {
            c.ui.InfoMessage(fmt.Sprintf("Peer %s disconnected", conn.PeerID.String()))
        }
    }
}

func (c *Chat) receiveMessages() {
    defer c.wg.Done()
    for {
        select {
        case <-c.ctx.Done():
            return
        case value := <-c.C:

            msgContentTopic := value.Message().ContentTopic
            if msgContentTopic != c.options.ContentTopic {
                continue // Discard messages from other topics
            }

            msg, err := decodeMessage(c.options.ContentTopic, value.Message())
            if err == nil {
                // send valid messages to the UI
                c.ui.ChatMessage(int64(msg.Timestamp), msg.Nick, string(msg.Payload))
            }
        }
    }
}
func (c *Chat) parseInput() {
    defer c.wg.Done()
    for {
        select {
        case <-c.ctx.Done():
            return
        case line := <-c.inputChan:
            c.ui.SetSending(true)
            go func() {
                defer c.ui.SetSending(false)

                // bail if requested
                if line == "/exit" {
                    c.ui.Quit()
                    fmt.Println("Bye!")
                    return
                }

                // add peer
                if strings.HasPrefix(line, "/connect") {
                    peer := strings.TrimPrefix(line, "/connect ")
                    c.wg.Add(1)
                    go func(peer string) {
                        defer c.wg.Done()

                        ma, err := multiaddr.NewMultiaddr(peer)
                        if err != nil {
                            c.ui.ErrorMessage(err)
                            return
                        }

                        peerID, err := ma.ValueForProtocol(multiaddr.P_P2P)
                        if err != nil {
                            c.ui.ErrorMessage(err)
                            return
                        }

                        c.ui.InfoMessage(fmt.Sprintf("Connecting to peer: %s", peerID))
                        ctx, cancel := context.WithTimeout(c.ctx, time.Duration(10)*time.Second)
                        defer cancel()

                        err = c.node.DialPeerWithMultiAddress(ctx, ma)
                        if err != nil {
                            c.ui.ErrorMessage(err)
                        }
                    }(peer)
                    return
                }

                // list peers
                if line == "/peers" {
                    peers := c.node.Host().Network().Peers()
                    if len(peers) == 0 {
                        c.ui.InfoMessage("No peers available")
                    } else {
                        peerInfoMsg := "Peers: \n"
                        for _, p := range peers {
                            peerInfo := c.node.Host().Peerstore().PeerInfo(p)
                            peerProtocols, err := c.node.Host().Peerstore().GetProtocols(p)
                            if err != nil {
                                c.ui.ErrorMessage(err)
                                return
                            }
                            peerInfoMsg += fmt.Sprintf("• %s:\n", p.String())

                            var strProtocols []string
                            for _, p := range peerProtocols {
                                strProtocols = append(strProtocols, string(p))
                            }

                            peerInfoMsg += fmt.Sprintf("    Protocols: %s\n", strings.Join(strProtocols, ", "))
                            peerInfoMsg += "    Addresses:\n"
                            for _, addr := range peerInfo.Addrs {
                                peerInfoMsg += fmt.Sprintf("    - %s/p2p/%s\n", addr.String(), p.String())
                            }
                        }
                        c.ui.InfoMessage(peerInfoMsg)
                    }
                    return
                }

                // change nick
                if strings.HasPrefix(line, "/nick") {
                    newNick := strings.TrimSpace(strings.TrimPrefix(line, "/nick "))
                    if newNick != "" {
                        c.nick = newNick
                    } else {
                        c.ui.ErrorMessage(errors.New("invalid nickname"))
                    }
                    return
                }

                if line == "/help" {
                    c.ui.InfoMessage(`Available commands:
  /connect multiaddress - dials a node adding it to the list of connected peers
  /peers - list of peers connected to this node
  /nick newNick - change the user's nickname
  /exit - closes the app`)
                    return
                }

                c.SendMessage(line)
            }()
        }
    }
}

func (c *Chat) SendMessage(line string) {
    tCtx, cancel := context.WithTimeout(c.ctx, 3*time.Second)
    defer func() {
        cancel()
    }()

    err := c.publish(tCtx, line)
    if err != nil {
        if err.Error() == "validation failed" {
            err = errors.New("message rate violation")
        }
        c.ui.ErrorMessage(err)
    }
}

func (c *Chat) publish(ctx context.Context, message string) error {
    msg := &pb.Chat2Message{
        Timestamp: uint64(c.node.Timesource().Now().Unix()),
        Nick:      c.nick,
        Payload:   []byte(message),
    }

    msgBytes, err := proto.Marshal(msg)
    if err != nil {
        return err
    }

    version := uint32(0)
    timestamp := utils.GetUnixEpochFrom(c.node.Timesource().Now())
    keyInfo := &payload.KeyInfo{
        Kind: payload.None,
    }

    p := new(payload.Payload)
    p.Data = msgBytes
    p.Key = keyInfo

    payload, err := p.Encode(version)
    if err != nil {
        return err
    }

    wakuMsg := &wpb.WakuMessage{
        Payload:      payload,
        Version:      proto.Uint32(version),
        ContentTopic: options.ContentTopic,
        Timestamp:    timestamp,
    }

    if c.options.RLNRelay.Enable {
        // for future version when we support more than one rln protected content topic,
        // we should check the message content topic as well
        err = c.node.RLNRelay().AppendRLNProof(wakuMsg, c.node.Timesource().Now())
        if err != nil {
            return err
        }

        rateLimitProof, err := wrln.BytesToRateLimitProof(wakuMsg.RateLimitProof)
        if err != nil {
            return err
        }

        c.ui.InfoMessage(fmt.Sprintf("RLN Epoch: %d", rateLimitProof.Epoch.Uint64()))
    }

    if c.options.LightPush.Enable {
        lightOpt := []lightpush.RequestOption{lightpush.WithDefaultPubsubTopic()}
        var peerID peer.ID
        peerID, err = options.LightPush.NodePeerID()
        if err != nil {
            lightOpt = append(lightOpt, lightpush.WithAutomaticPeerSelection())
        } else {
            lightOpt = append(lightOpt, lightpush.WithPeer(peerID))
        }

        _, err = c.node.Lightpush().Publish(c.ctx, wakuMsg, lightOpt...)
    } else {
        _, err = c.node.Relay().Publish(ctx, wakuMsg, relay.WithDefaultPubsubTopic())
    }

    return err
}

func decodeMessage(contentTopic string, wakumsg *wpb.WakuMessage) (*pb.Chat2Message, error) {
    keyInfo := &payload.KeyInfo{
        Kind: payload.None,
    }

    payload, err := payload.DecodePayload(wakumsg, keyInfo)
    if err != nil {
        return nil, err
    }

    msg := &pb.Chat2Message{}
    if err := proto.Unmarshal(payload.Data, msg); err != nil {
        return nil, err
    }

    return msg, nil
}

func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) {
    defer c.wg.Done()

    connectionWg.Wait() // Wait until node connection operations are done

    if !c.options.Store.Enable {
        return
    }

    var storeOpt legacy_store.HistoryRequestOption
    if c.options.Store.Node == nil {
        c.ui.InfoMessage("No store node configured. Choosing one at random...")
        storeOpt = legacy_store.WithAutomaticPeerSelection()
    } else {
        peerID, err := (*c.options.Store.Node).ValueForProtocol(multiaddr.P_P2P)
        if err != nil {
            c.ui.ErrorMessage(err)
            return
        }
        pID, err := peer.Decode(peerID)
        if err != nil {
            c.ui.ErrorMessage(err)
            return
        }
        storeOpt = legacy_store.WithPeer(pID)
        c.ui.InfoMessage(fmt.Sprintf("Querying historic messages from %s", peerID))

    }

    tCtx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
    defer cancel()

    q := legacy_store.Query{
        ContentTopics: []string{options.ContentTopic},
    }

    response, err := c.node.LegacyStore().Query(tCtx, q,
        legacy_store.WithAutomaticRequestID(),
        storeOpt,
        legacy_store.WithPaging(false, 100))

    if err != nil {
        c.ui.ErrorMessage(fmt.Errorf("could not query storenode: %w", err))
    } else {
        if len(response.Messages) == 0 {
            c.ui.InfoMessage("0 historic messages available")
        } else {
            for _, msg := range response.Messages {
                c.C <- protocol.NewEnvelope(msg, msg.GetTimestamp(), relay.DefaultWakuTopic)
            }
        }
    }
}

func (c *Chat) staticNodes(connectionWg *sync.WaitGroup) {
    defer c.wg.Done()
    defer connectionWg.Done()

    <-c.uiReady // wait until UI is ready

    wg := sync.WaitGroup{}

    wg.Add(len(options.StaticNodes))
    for _, n := range options.StaticNodes {
        go func(addr multiaddr.Multiaddr) {
            defer wg.Done()
            ctx, cancel := context.WithTimeout(c.ctx, time.Duration(10)*time.Second)
            defer cancel()

            c.ui.InfoMessage(fmt.Sprintf("Connecting to %s", addr.String()))

            err := c.node.DialPeerWithMultiAddress(ctx, addr)
            if err != nil {
                c.ui.ErrorMessage(err)
            }
        }(n)
    }

    wg.Wait()

}

func (c *Chat) welcomeMessage() {
    defer c.wg.Done()

    <-c.uiReady // wait until UI is ready

    c.ui.InfoMessage("Welcome, " + c.nick + "!")
    c.ui.InfoMessage("type /help to see available commands \n")

    addrMessage := "Listening on:\n"
    for _, addr := range c.node.ListenAddresses() {
        addrMessage += "  -" + addr.String() + "\n"
    }
    c.ui.InfoMessage(addrMessage)

    if !c.options.RLNRelay.Enable {
        return
    }

    credential, err := c.node.RLNRelay().IdentityCredential()
    if err != nil {
        c.ui.Quit()
        fmt.Println(err.Error())
    }

    idx := c.node.RLNRelay().MembershipIndex()

    idTrapdoor := credential.IDTrapdoor
    idNullifier := credential.IDSecretHash
    idSecretHash := credential.IDSecretHash
    idCommitment := credential.IDCommitment

    rlnMessage := "RLN config:\n"
    rlnMessage += fmt.Sprintf("- Your membership index is: %d\n", idx)
    rlnMessage += fmt.Sprintf("- Your rln identity trapdoor is: 0x%s\n", hex.EncodeToString(idTrapdoor[:]))
    rlnMessage += fmt.Sprintf("- Your rln identity nullifier is: 0x%s\n", hex.EncodeToString(idNullifier[:]))
    rlnMessage += fmt.Sprintf("- Your rln identity secret hash is: 0x%s\n", hex.EncodeToString(idSecretHash[:]))
    rlnMessage += fmt.Sprintf("- Your rln identity commitment key is: 0x%s\n", hex.EncodeToString(idCommitment[:]))

    c.ui.InfoMessage(rlnMessage)
}

func (c *Chat) discoverNodes(connectionWg *sync.WaitGroup) {
    defer c.wg.Done()
    defer connectionWg.Done()

    <-c.uiReady // wait until UI is ready

    var dnsDiscoveryUrl string
    if options.Fleet != fleetNone {
        if options.Fleet == fleetTest {
            dnsDiscoveryUrl = "enrtree://AOGYWMBYOUIMOENHXCHILPKY3ZRFEULMFI4DOM442QSZ73TT2A7VI@test.waku.nodes.status.im"
        } else {
            // Connect to prod by default
            dnsDiscoveryUrl = "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im"
        }
    }

    if options.DNSDiscovery.Enable && options.DNSDiscovery.URL != "" {
        dnsDiscoveryUrl = options.DNSDiscovery.URL
    }

    if dnsDiscoveryUrl != "" {
        c.ui.InfoMessage(fmt.Sprintf("attempting DNS discovery with %s", dnsDiscoveryUrl))
        nodes, err := dnsdisc.RetrieveNodes(c.ctx, dnsDiscoveryUrl, dnsdisc.WithNameserver(options.DNSDiscovery.Nameserver))
        if err != nil {
            c.ui.ErrorMessage(errors.New(err.Error()))
        } else {
            var nodeList []peer.AddrInfo
            for _, n := range nodes {
                nodeList = append(nodeList, n.PeerInfo)
            }
            c.ui.InfoMessage(fmt.Sprintf("Discovered and connecting to %v ", nodeList))
            wg := sync.WaitGroup{}
            wg.Add(len(nodeList))
            for _, n := range nodeList {
                go func(ctx context.Context, info peer.AddrInfo) {
                    defer wg.Done()

                    ctx, cancel := context.WithTimeout(ctx, time.Duration(10)*time.Second)
                    defer cancel()
                    err = c.node.DialPeerWithInfo(ctx, info)
                    if err != nil {

                        c.ui.ErrorMessage(fmt.Errorf("co!!uld not connect to %s: %w", info.ID.String(), err))
                    }
                }(c.ctx, n)

            }
            wg.Wait()
        }
    }
}