ipfs/ipfs-cluster

View on GitHub
adder/util.go

Summary

Maintainability
A
0 mins
Test Coverage
package adder

import (
    "context"
    "errors"
    "sync"

    "github.com/ipfs-cluster/ipfs-cluster/api"
    "go.uber.org/multierr"

    cid "github.com/ipfs/go-cid"
    ipld "github.com/ipfs/go-ipld-format"
    peer "github.com/libp2p/go-libp2p/core/peer"
    rpc "github.com/libp2p/go-libp2p-gorpc"
)

// ErrBlockAdder is returned when adding a to multiple destinations
// block fails on all of them.
var ErrBlockAdder = errors.New("failed to put block on all destinations")

// BlockStreamer helps streaming nodes to multiple destinations, as long as
// one of them is still working.
type BlockStreamer struct {
    dests     []peer.ID
    rpcClient *rpc.Client
    blocks    <-chan api.NodeWithMeta

    ctx    context.Context
    cancel context.CancelFunc
    errMu  sync.Mutex
    err    error
}

// NewBlockStreamer creates a BlockStreamer given an rpc client, allocated
// peers and a channel on which the blocks to stream are received.
func NewBlockStreamer(ctx context.Context, rpcClient *rpc.Client, dests []peer.ID, blocks <-chan api.NodeWithMeta) *BlockStreamer {
    bsCtx, cancel := context.WithCancel(ctx)

    bs := BlockStreamer{
        ctx:       bsCtx,
        cancel:    cancel,
        dests:     dests,
        rpcClient: rpcClient,
        blocks:    blocks,
        err:       nil,
    }

    go bs.streamBlocks()
    return &bs
}

// Done returns a channel which gets closed when the BlockStreamer has
// finished.
func (bs *BlockStreamer) Done() <-chan struct{} {
    return bs.ctx.Done()
}

func (bs *BlockStreamer) setErr(err error) {
    bs.errMu.Lock()
    bs.err = err
    bs.errMu.Unlock()
}

// Err returns any errors that happened after the operation of the
// BlockStreamer, for example when blocks could not be put to all nodes.
func (bs *BlockStreamer) Err() error {
    bs.errMu.Lock()
    defer bs.errMu.Unlock()
    return bs.err
}

func (bs *BlockStreamer) streamBlocks() {
    defer bs.cancel()

    // Nothing should be sent on out.
    // We drain though
    out := make(chan struct{})
    go func() {
        for range out {
        }
    }()

    errs := bs.rpcClient.MultiStream(
        bs.ctx,
        bs.dests,
        "IPFSConnector",
        "BlockStream",
        bs.blocks,
        out,
    )

    combinedErrors := multierr.Combine(errs...)

    // FIXME: replicate everywhere.
    if len(multierr.Errors(combinedErrors)) == len(bs.dests) {
        logger.Error(combinedErrors)
        bs.setErr(ErrBlockAdder)
    } else if combinedErrors != nil {
        logger.Warning("there were errors streaming blocks, but at least one destination succeeded")
        logger.Warning(combinedErrors)
    }
}

// IpldNodeToNodeWithMeta converts an ipld.Node to api.NodeWithMeta.
func IpldNodeToNodeWithMeta(n ipld.Node) api.NodeWithMeta {
    size, err := n.Size()
    if err != nil {
        logger.Warn(err)
    }

    return api.NodeWithMeta{
        Cid:     api.NewCid(n.Cid()),
        Data:    n.RawData(),
        CumSize: size,
    }
}

// BlockAllocate helps allocating blocks to peers.
func BlockAllocate(ctx context.Context, rpc *rpc.Client, pinOpts api.PinOptions) ([]peer.ID, error) {
    // Find where to allocate this file
    var allocsStr []peer.ID
    err := rpc.CallContext(
        ctx,
        "",
        "Cluster",
        "BlockAllocate",
        api.PinWithOpts(api.CidUndef, pinOpts),
        &allocsStr,
    )
    return allocsStr, err
}

// Pin helps sending local RPC pin requests.
func Pin(ctx context.Context, rpc *rpc.Client, pin api.Pin) error {
    if pin.ReplicationFactorMin < 0 {
        pin.Allocations = []peer.ID{}
    }
    logger.Debugf("adder pinning %+v", pin)
    var pinResp api.Pin
    return rpc.CallContext(
        ctx,
        "", // use ourself to pin
        "Cluster",
        "Pin",
        pin,
        &pinResp,
    )
}

// ErrDAGNotFound is returned whenever we try to get a block from the DAGService.
var ErrDAGNotFound = errors.New("dagservice: a Get operation was attempted while cluster-adding (this is likely a bug)")

// BaseDAGService partially implements an ipld.DAGService.
// It provides the methods which are not needed by ClusterDAGServices
// (Get*, Remove*) so that they can save adding this code.
type BaseDAGService struct {
}

// Get always returns errNotFound
func (dag BaseDAGService) Get(ctx context.Context, key cid.Cid) (ipld.Node, error) {
    return nil, ErrDAGNotFound
}

// GetMany returns an output channel that always emits an error
func (dag BaseDAGService) GetMany(ctx context.Context, keys []cid.Cid) <-chan *ipld.NodeOption {
    out := make(chan *ipld.NodeOption, 1)
    out <- &ipld.NodeOption{Err: ErrDAGNotFound}
    close(out)
    return out
}

// Remove is a nop
func (dag BaseDAGService) Remove(ctx context.Context, key cid.Cid) error {
    return nil
}

// RemoveMany is a nop
func (dag BaseDAGService) RemoveMany(ctx context.Context, keys []cid.Cid) error {
    return nil
}