examples/chat2-reliable/chat.go
package main
import (
"chat2-reliable/pb"
"context"
"encoding/hex"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
wrln "github.com/waku-org/go-waku/waku/v2/protocol/rln"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/utils"
"google.golang.org/protobuf/proto"
)
const (
maxMessageHistory = 100
)
type Chat struct {
ctx context.Context
wg sync.WaitGroup
node *node.WakuNode
ui UI
uiReady chan struct{}
inputChan chan string
options Options
C chan *protocol.Envelope
nick string
lamportTimestamp int32
bloomFilter *RollingBloomFilter
outgoingBuffer []UnacknowledgedMessage
incomingBuffer []*pb.Message
messageHistory []*pb.Message
mutex sync.Mutex
lamportTSMutex sync.Mutex
}
func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.PeerConnection, options Options) *Chat {
chat := &Chat{
ctx: ctx,
node: node,
options: options,
nick: options.Nickname,
uiReady: make(chan struct{}, 1),
inputChan: make(chan string, 100),
lamportTimestamp: 0,
bloomFilter: NewRollingBloomFilter(),
outgoingBuffer: make([]UnacknowledgedMessage, 0),
incomingBuffer: make([]*pb.Message, 0),
messageHistory: make([]*pb.Message, 0),
mutex: sync.Mutex{},
lamportTSMutex: sync.Mutex{},
}
chat.ui = NewUIModel(chat.uiReady, chat.inputChan)
topics := options.Relay.Topics.Value()
if len(topics) == 0 {
topics = append(topics, relay.DefaultWakuTopic)
}
if options.Filter.Enable {
cf := protocol.ContentFilter{
PubsubTopic: relay.DefaultWakuTopic,
ContentTopics: protocol.NewContentTopicSet(options.ContentTopic),
}
var filterOpt filter.FilterSubscribeOption
peerID, err := options.Filter.NodePeerID()
if err != nil {
filterOpt = filter.WithAutomaticPeerSelection()
} else {
filterOpt = filter.WithPeer(peerID)
chat.ui.InfoMessage(fmt.Sprintf("Subscribing to filter node %s", peerID))
}
theFilters, err := node.FilterLightnode().Subscribe(ctx, cf, filterOpt)
if err != nil {
chat.ui.ErrorMessage(err)
} else {
chat.C = theFilters[0].C // Picking first subscription since there is only 1 contentTopic specified.
}
} else {
for _, topic := range topics {
sub, err := node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
chat.ui.ErrorMessage(err)
} else {
chat.C = make(chan *protocol.Envelope)
go func() {
for e := range sub[0].Ch {
chat.C <- e
}
}()
}
}
}
connWg := sync.WaitGroup{}
connWg.Add(2)
chat.wg.Add(7) // Added 2 more goroutines for periodic tasks
go chat.parseInput()
go chat.receiveMessages()
go chat.welcomeMessage()
go chat.connectionWatcher(connNotifier)
go chat.staticNodes(&connWg)
go chat.discoverNodes(&connWg)
go chat.retrieveHistory(&connWg)
chat.initReliabilityProtocol() // Initialize the reliability protocol
return chat
}
func (c *Chat) Stop() {
c.wg.Wait()
close(c.inputChan)
}
func (c *Chat) connectionWatcher(connNotifier <-chan node.PeerConnection) {
defer c.wg.Done()
for {
select {
case conn := <-connNotifier:
if conn.Connected {
c.ui.InfoMessage(fmt.Sprintf("Peer %s connected", conn.PeerID.String()))
} else {
c.ui.InfoMessage(fmt.Sprintf("Peer %s disconnected", conn.PeerID.String()))
}
case <-c.ctx.Done():
return
}
}
}
func (c *Chat) receiveMessages() {
defer c.wg.Done()
for {
select {
case <-c.ctx.Done():
return
case value := <-c.C:
msgContentTopic := value.Message().ContentTopic
if msgContentTopic != c.options.ContentTopic {
continue // Discard messages from other topics
}
msg, err := decodeMessage(c.options.ContentTopic, value.Message())
if err != nil {
fmt.Printf("Error decoding message: %v\n", err)
continue
}
c.processReceivedMessage(msg)
}
}
}
func (c *Chat) parseInput() {
defer c.wg.Done()
var disconnectedPeers []peer.ID
for {
select {
case <-c.ctx.Done():
return
case line := <-c.inputChan:
c.ui.SetSending(true)
go func() {
defer c.ui.SetSending(false)
// bail if requested
if line == "/exit" {
c.ui.Quit()
fmt.Println("Bye!")
return
}
// add peer
if strings.HasPrefix(line, "/connect") {
peer := strings.TrimPrefix(line, "/connect ")
c.wg.Add(1)
go func(peer string) {
defer c.wg.Done()
ma, err := multiaddr.NewMultiaddr(peer)
if err != nil {
c.ui.ErrorMessage(err)
return
}
peerID, err := ma.ValueForProtocol(multiaddr.P_P2P)
if err != nil {
c.ui.ErrorMessage(err)
return
}
c.ui.InfoMessage(fmt.Sprintf("Connecting to peer: %s", peerID))
ctx, cancel := context.WithTimeout(c.ctx, time.Duration(10)*time.Second)
defer cancel()
err = c.node.DialPeerWithMultiAddress(ctx, ma)
if err != nil {
c.ui.ErrorMessage(err)
}
}(peer)
return
}
// list peers
if line == "/peers" {
peers := c.node.Host().Network().Peers()
if len(peers) == 0 {
c.ui.InfoMessage("No peers available")
} else {
peerInfoMsg := "Peers: \n"
for _, p := range peers {
peerInfo := c.node.Host().Peerstore().PeerInfo(p)
peerProtocols, err := c.node.Host().Peerstore().GetProtocols(p)
if err != nil {
c.ui.ErrorMessage(err)
return
}
peerInfoMsg += fmt.Sprintf("• %s:\n", p.String())
var strProtocols []string
for _, p := range peerProtocols {
strProtocols = append(strProtocols, string(p))
}
peerInfoMsg += fmt.Sprintf(" Protocols: %s\n", strings.Join(strProtocols, ", "))
peerInfoMsg += " Addresses:\n"
for _, addr := range peerInfo.Addrs {
peerInfoMsg += fmt.Sprintf(" - %s/p2p/%s\n", addr.String(), p.String())
}
}
c.ui.InfoMessage(peerInfoMsg)
}
return
}
// change nick
if strings.HasPrefix(line, "/nick") {
newNick := strings.TrimSpace(strings.TrimPrefix(line, "/nick "))
if newNick != "" {
c.nick = newNick
} else {
c.ui.ErrorMessage(errors.New("invalid nickname"))
}
return
}
if line == "/help" {
c.ui.InfoMessage(`Available commands:
/connect multiaddress - dials a node adding it to the list of connected peers
/peers - list of peers connected to this node
/nick newNick - change the user's nickname
/disconnect - disconnect from all currently connected peers
/reconnect - attempt to reconnect to previously disconnected peers
/exit - closes the app`)
return
}
// Disconnect from peers
if line == "/disconnect" {
disconnectedPeers = c.disconnectFromPeers()
c.ui.InfoMessage("Disconnected from all peers. Use /reconnect to reconnect.")
return
}
// Reconnect to peers
if line == "/reconnect" {
if len(disconnectedPeers) == 0 {
c.ui.InfoMessage("No disconnection active. Use /disconnect first.")
} else {
c.reconnectToPeers(disconnectedPeers)
disconnectedPeers = nil
c.ui.InfoMessage("Reconnection initiated.")
}
return
}
// If no command matched, send as a regular message
c.SendMessage(line)
}()
}
}
}
func (c *Chat) publish(ctx context.Context, message *pb.Message) error {
msgBytes, err := proto.Marshal(message)
if err != nil {
return err
}
version := uint32(0)
timestamp := utils.GetUnixEpochFrom(c.node.Timesource().Now())
keyInfo := &payload.KeyInfo{
Kind: payload.None,
}
p := new(payload.Payload)
p.Data = msgBytes
p.Key = keyInfo
payload, err := p.Encode(version)
if err != nil {
return err
}
wakuMsg := &wpb.WakuMessage{
Payload: payload,
Version: proto.Uint32(version),
ContentTopic: c.options.ContentTopic,
Timestamp: timestamp,
}
if c.options.RLNRelay.Enable {
err = c.node.RLNRelay().AppendRLNProof(wakuMsg, c.node.Timesource().Now())
if err != nil {
return err
}
rateLimitProof, err := wrln.BytesToRateLimitProof(wakuMsg.RateLimitProof)
if err != nil {
return err
}
c.ui.InfoMessage(fmt.Sprintf("RLN Epoch: %d", rateLimitProof.Epoch.Uint64()))
}
if c.options.LightPush.Enable {
lightOpt := []lightpush.RequestOption{lightpush.WithDefaultPubsubTopic()}
var peerID peer.ID
peerID, err = c.options.LightPush.NodePeerID()
if err != nil {
lightOpt = append(lightOpt, lightpush.WithAutomaticPeerSelection())
} else {
lightOpt = append(lightOpt, lightpush.WithPeer(peerID))
}
_, err = c.node.Lightpush().Publish(ctx, wakuMsg, lightOpt...)
} else {
_, err = c.node.Relay().Publish(ctx, wakuMsg, relay.WithDefaultPubsubTopic())
}
return err
}
func decodeMessage(contentTopic string, wakumsg *wpb.WakuMessage) (*pb.Message, error) {
keyInfo := &payload.KeyInfo{
Kind: payload.None,
}
payload, err := payload.DecodePayload(wakumsg, keyInfo)
if err != nil {
return nil, err
}
msg := &pb.Message{}
if err := proto.Unmarshal(payload.Data, msg); err != nil {
return nil, err
}
return msg, nil
}
func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) {
defer c.wg.Done()
connectionWg.Wait() // Wait until node connection operations are
if !c.options.Store.Enable {
return
}
var storeOpt store.RequestOption
if c.options.Store.Node == nil {
c.ui.InfoMessage("No store node configured. Choosing one at random...")
storeOpt = store.WithAutomaticPeerSelection()
} else {
pID, err := c.getStoreNodePID()
if err != nil {
c.ui.ErrorMessage(err)
return
}
storeOpt = store.WithPeer(*pID)
c.ui.InfoMessage(fmt.Sprintf("Querying historic messages from %s", pID.String()))
}
tCtx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
defer cancel()
q := store.FilterCriteria{
ContentFilter: protocol.NewContentFilter(relay.DefaultWakuTopic, c.options.ContentTopic),
}
response, err := c.node.Store().Request(tCtx, q,
store.WithAutomaticRequestID(),
storeOpt,
store.WithPaging(false, 100))
if err != nil {
c.ui.ErrorMessage(fmt.Errorf("could not query storenode: %w", err))
} else {
if len(response.Messages()) == 0 {
c.ui.InfoMessage("0 historic messages available")
} else {
for _, msg := range response.Messages() {
c.C <- protocol.NewEnvelope(msg.Message, msg.Message.GetTimestamp(), relay.DefaultWakuTopic)
}
}
}
}
func (c *Chat) staticNodes(connectionWg *sync.WaitGroup) {
defer c.wg.Done()
defer connectionWg.Done()
<-c.uiReady // wait until UI is ready
wg := sync.WaitGroup{}
wg.Add(len(c.options.StaticNodes))
for _, n := range c.options.StaticNodes {
go func(addr multiaddr.Multiaddr) {
defer wg.Done()
ctx, cancel := context.WithTimeout(c.ctx, time.Duration(10)*time.Second)
defer cancel()
c.ui.InfoMessage(fmt.Sprintf("Connecting to %s", addr.String()))
err := c.node.DialPeerWithMultiAddress(ctx, addr)
if err != nil {
c.ui.ErrorMessage(err)
}
}(n)
}
wg.Wait()
}
func (c *Chat) welcomeMessage() {
defer c.wg.Done()
<-c.uiReady // wait until UI is ready
c.ui.InfoMessage("Welcome, " + c.nick + "!")
c.ui.InfoMessage("type /help to see available commands \n")
addrMessage := "Listening on:\n"
for _, addr := range c.node.ListenAddresses() {
addrMessage += " -" + addr.String() + "\n"
}
c.ui.InfoMessage(addrMessage)
if !c.options.RLNRelay.Enable {
return
}
credential, err := c.node.RLNRelay().IdentityCredential()
if err != nil {
c.ui.Quit()
}
idx := c.node.RLNRelay().MembershipIndex()
idTrapdoor := credential.IDTrapdoor
idNullifier := credential.IDSecretHash
idSecretHash := credential.IDSecretHash
idCommitment := credential.IDCommitment
rlnMessage := "RLN config:\n"
rlnMessage += fmt.Sprintf("- Your membership index is: %d\n", idx)
rlnMessage += fmt.Sprintf("- Your rln identity trapdoor is: 0x%s\n", hex.EncodeToString(idTrapdoor[:]))
rlnMessage += fmt.Sprintf("- Your rln identity nullifier is: 0x%s\n", hex.EncodeToString(idNullifier[:]))
rlnMessage += fmt.Sprintf("- Your rln identity secret hash is: 0x%s\n", hex.EncodeToString(idSecretHash[:]))
rlnMessage += fmt.Sprintf("- Your rln identity commitment key is: 0x%s\n", hex.EncodeToString(idCommitment[:]))
c.ui.InfoMessage(rlnMessage)
}
func (c *Chat) discoverNodes(connectionWg *sync.WaitGroup) {
defer c.wg.Done()
defer connectionWg.Done()
<-c.uiReady // wait until UI is ready
var dnsDiscoveryUrl string
if c.options.DNSDiscovery.Enable {
if c.options.Fleet != fleetNone {
if c.options.Fleet == fleetTest {
dnsDiscoveryUrl = "enrtree://AOGYWMBYOUIMOENHXCHILPKY3ZRFEULMFI4DOM442QSZ73TT2A7VI@test.waku.nodes.status.im"
} else {
// Connect to prod by default
dnsDiscoveryUrl = "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im"
}
}
if c.options.DNSDiscovery.URL != "" {
dnsDiscoveryUrl = c.options.DNSDiscovery.URL
}
}
if dnsDiscoveryUrl != "" {
c.ui.InfoMessage(fmt.Sprintf("attempting DNS discovery with %s", dnsDiscoveryUrl))
nodes, err := dnsdisc.RetrieveNodes(c.ctx, dnsDiscoveryUrl, dnsdisc.WithNameserver(c.options.DNSDiscovery.Nameserver))
if err != nil {
c.ui.ErrorMessage(errors.New(err.Error()))
} else {
var nodeList []peer.AddrInfo
for _, n := range nodes {
nodeList = append(nodeList, n.PeerInfo)
}
c.ui.InfoMessage(fmt.Sprintf("Discovered and connecting to %v ", nodeList))
wg := sync.WaitGroup{}
wg.Add(len(nodeList))
for _, n := range nodeList {
go func(ctx context.Context, info peer.AddrInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, time.Duration(20)*time.Second)
defer cancel()
err = c.node.DialPeerWithInfo(ctx, info)
if err != nil {
c.ui.ErrorMessage(fmt.Errorf("could not connect to %s: %w", info.ID.String(), err))
}
}(c.ctx, n)
}
wg.Wait()
}
}
}
func (c *Chat) disconnectFromPeers() []peer.ID {
disconnectedPeers := c.node.Host().Network().Peers()
for _, peerID := range disconnectedPeers {
c.node.Host().Network().ClosePeer(peerID)
}
return disconnectedPeers
}
func (c *Chat) reconnectToPeers(peers []peer.ID) {
for _, peerID := range peers {
// We're using a goroutine here to avoid blocking if a peer is unreachable
go func(p peer.ID) {
ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
defer cancel()
if _, err := c.node.Host().Network().DialPeer(ctx, p); err != nil {
c.ui.ErrorMessage(fmt.Errorf("failed to reconnect to peer %s: %w", p, err))
} else {
c.ui.InfoMessage(fmt.Sprintf("Successfully reconnected to peer %s", p))
}
}(peerID)
}
}
func generateUniqueID() string {
return uuid.New().String()
}
func (c *Chat) getRecentMessageIDs(n int) []string {
c.mutex.Lock()
defer c.mutex.Unlock()
result := make([]string, 0, n)
for i := len(c.messageHistory) - 1; i >= 0 && len(result) < n; i-- {
result = append(result, c.messageHistory[i].MessageId)
}
return result
}
func (c *Chat) getStoreNodePID() (*peer.ID, error) {
pID, err := utils.GetPeerID(*c.options.Store.Node)
if err != nil {
return nil, err
}
return &pID, nil
}