ipfs/ipfs-cluster

View on GitHub
ipfsconn/ipfshttp/ipfshttp.go

Summary

Maintainability
B
5 hrs
Test Coverage
// Package ipfshttp implements an IPFS Cluster IPFSConnector component. It
// uses the IPFS HTTP API to communicate to IPFS.
package ipfshttp

import (
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "io"
    "net/http"
    "net/url"
    "strconv"
    "strings"
    "sync"
    "sync/atomic"
    "time"

    "github.com/ipfs-cluster/ipfs-cluster/api"
    "github.com/ipfs-cluster/ipfs-cluster/observations"
    "github.com/tv42/httpunix"

    files "github.com/ipfs/boxo/files"
    gopath "github.com/ipfs/boxo/path"
    ipfspinner "github.com/ipfs/boxo/pinning/pinner"
    cid "github.com/ipfs/go-cid"
    logging "github.com/ipfs/go-log/v2"
    rpc "github.com/libp2p/go-libp2p-gorpc"
    peer "github.com/libp2p/go-libp2p/core/peer"
    manet "github.com/multiformats/go-multiaddr/net"
    "github.com/multiformats/go-multicodec"
    multihash "github.com/multiformats/go-multihash"

    "go.opencensus.io/plugin/ochttp"
    "go.opencensus.io/plugin/ochttp/propagation/tracecontext"
    "go.opencensus.io/stats"
    "go.opencensus.io/trace"
)

// DNSTimeout is used when resolving DNS multiaddresses in this module
var DNSTimeout = 5 * time.Second

var logger = logging.Logger("ipfshttp")

// Connector implements the IPFSConnector interface
// and provides a component which  is used to perform
// on-demand requests against the configured IPFS daemom
// (such as a pin request).
type Connector struct {
    // struct alignment! These fields must be up-front.
    updateMetricCount uint64
    ipfsPinCount      int64

    ctx    context.Context
    cancel func()
    ready  chan struct{}

    config      *Config
    nodeAddr    string
    nodeNetwork string

    rpcClient *rpc.Client
    rpcReady  chan struct{}

    client *http.Client // client to ipfs daemon

    failedRequests atomic.Uint64 // count failed requests.
    reqRateLimitCh chan struct{}

    shutdownLock sync.Mutex
    shutdown     bool
    wg           sync.WaitGroup
}

type ipfsError struct {
    path    string
    code    int
    Message string
}

func (ie ipfsError) Error() string {
    return fmt.Sprintf(
        "IPFS error (%s). Code: %d. Message: %s",
        ie.path,
        ie.code,
        ie.Message,
    )
}

type ipfsUnpinnedError ipfsError

func (unpinned ipfsUnpinnedError) Is(target error) bool {
    ierr, ok := target.(ipfsError)
    if !ok {
        return false
    }
    return strings.HasSuffix(ierr.Message, "not pinned")
}

func (unpinned ipfsUnpinnedError) Error() string {
    return ipfsError(unpinned).Error()
}

type ipfsIDResp struct {
    ID        string
    Addresses []string
}

type ipfsResolveResp struct {
    Path string
}

type ipfsRepoGCResp struct {
    Key   cid.Cid
    Error string
}

type ipfsPinsResp struct {
    Pins     []string
    Progress int
}

type ipfsSwarmPeersResp struct {
    Peers []ipfsPeer
}

type ipfsBlockPutResp struct {
    Key  api.Cid
    Size int
}

type ipfsPeer struct {
    Peer string
}

// NewConnector creates the component and leaves it ready to be started
func NewConnector(cfg *Config) (*Connector, error) {
    err := cfg.Validate()
    if err != nil {
        return nil, err
    }

    nodeNetwork, nodeAddr, err := manet.DialArgs(cfg.NodeAddr)
    if err != nil {
        return nil, err
    }

    c := &http.Client{} // timeouts are handled by context timeouts

    if nodeNetwork == "unix" {
        unixTransport := &httpunix.Transport{
            DialTimeout: time.Second,
        }
        unixTransport.RegisterLocation("ipfs", nodeAddr)
        t := &http.Transport{}
        t.RegisterProtocol(httpunix.Scheme, unixTransport)
        c.Transport = t
    }

    if cfg.Tracing {
        c.Transport = &ochttp.Transport{
            Base:           http.DefaultTransport,
            Propagation:    &tracecontext.HTTPFormat{},
            StartOptions:   trace.StartOptions{SpanKind: trace.SpanKindClient},
            FormatSpanName: func(req *http.Request) string { return req.Host + ":" + req.URL.Path + ":" + req.Method },
            NewClientTrace: ochttp.NewSpanAnnotatingClientTrace,
        }
    }

    ctx, cancel := context.WithCancel(context.Background())

    ipfs := &Connector{
        ctx:            ctx,
        cancel:         cancel,
        ready:          make(chan struct{}),
        config:         cfg,
        nodeAddr:       nodeAddr,
        nodeNetwork:    nodeNetwork,
        rpcReady:       make(chan struct{}, 1),
        reqRateLimitCh: make(chan struct{}),
        client:         c,
    }

    initializeMetrics(ctx)

    go ipfs.rateLimiter()

    go ipfs.run()
    return ipfs, nil
}

func initializeMetrics(ctx context.Context) {
    // initialize metrics
    stats.Record(ctx, observations.PinsIpfsPins.M(0))
    stats.Record(ctx, observations.PinsPinAdd.M(0))
    stats.Record(ctx, observations.PinsPinAddError.M(0))
    stats.Record(ctx, observations.BlocksPut.M(0))
    stats.Record(ctx, observations.BlocksAddedSize.M(0))
    stats.Record(ctx, observations.BlocksAdded.M(0))
    stats.Record(ctx, observations.BlocksAddedError.M(0))
}

// rateLimiter issues ticks in the reqRateLimitCh that allow requests to
// proceed. See doPostCtx.
func (ipfs *Connector) rateLimiter() {
    isRateLimiting := false

    // TODO: The rate-limiter is configured to start rate-limiting after
    // 10 failed requests at a rate of 1 req/s. This should probably be
    // configurable.
    for {
        failed := ipfs.failedRequests.Load()
        switch {
        case failed == 0:
            if isRateLimiting {
                // This does not print always,
                // only when there were several requests
                // waiting to read.
                logger.Warning("Lifting up rate limit")
            }
            isRateLimiting = false
        case failed > 0 && failed <= 10:
            isRateLimiting = false
        case failed > 10:
            if !isRateLimiting {
                logger.Warning("Rate-limiting requests to 1req/s")
            }
            isRateLimiting = true
            time.Sleep(time.Second)
        }

        // Send tick
        select {
        case <-ipfs.ctx.Done():
            close(ipfs.reqRateLimitCh)
            return
        case ipfs.reqRateLimitCh <- struct{}{}:
            // note that the channel is unbuffered,
            // therefore we will sit here until a method
            // wants to read from us, and they don't if
            // failed == 0.
        }
    }
}

// connects all ipfs daemons when
// we receive the rpcReady signal.
func (ipfs *Connector) run() {
    <-ipfs.rpcReady

    // wait for IPFS to be available
    i := 0
    for {
        select {
        case <-ipfs.ctx.Done():
            return
        default:
        }
        i++
        _, err := ipfs.ID(ipfs.ctx)
        if err == nil {
            close(ipfs.ready)
            break
        }
        if i%10 == 0 {
            logger.Warningf("ipfs does not seem to be available after %d retries", i)
        }

        // Requests will be rate-limited when going faster.
        time.Sleep(time.Second)
    }

    // Do not shutdown while launching threads
    // -- prevents race conditions with ipfs.wg.
    ipfs.shutdownLock.Lock()
    defer ipfs.shutdownLock.Unlock()

    if ipfs.config.ConnectSwarmsDelay == 0 {
        return
    }

    // This runs ipfs swarm connect to the daemons of other cluster members
    ipfs.wg.Add(1)
    go func() {
        defer ipfs.wg.Done()

        // It does not hurt to wait a little bit. i.e. think cluster
        // peers which are started at the same time as the ipfs
        // daemon...
        tmr := time.NewTimer(ipfs.config.ConnectSwarmsDelay)
        defer tmr.Stop()
        select {
        case <-tmr.C:
            // do not hang this goroutine if this call hangs
            // otherwise we hang during shutdown
            go ipfs.ConnectSwarms(ipfs.ctx)
        case <-ipfs.ctx.Done():
            return
        }
    }()
}

// SetClient makes the component ready to perform RPC
// requests.
func (ipfs *Connector) SetClient(c *rpc.Client) {
    ipfs.rpcClient = c
    ipfs.rpcReady <- struct{}{}
}

// Shutdown stops any listeners and stops the component from taking
// any requests.
func (ipfs *Connector) Shutdown(ctx context.Context) error {
    _, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Shutdown")
    defer span.End()

    ipfs.shutdownLock.Lock()
    defer ipfs.shutdownLock.Unlock()

    if ipfs.shutdown {
        logger.Debug("already shutdown")
        return nil
    }

    logger.Info("stopping IPFS Connector")

    ipfs.cancel()
    close(ipfs.rpcReady)

    ipfs.wg.Wait()
    ipfs.shutdown = true

    return nil
}

// Ready returns a channel which gets notified when a testing request to the
// IPFS daemon first succeeds.
func (ipfs *Connector) Ready(ctx context.Context) <-chan struct{} {
    return ipfs.ready
}

// ID performs an ID request against the configured
// IPFS daemon. It returns the fetched information.
// If the request fails, or the parsing fails, it
// returns an error.
func (ipfs *Connector) ID(ctx context.Context) (api.IPFSID, error) {
    ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/ID")
    defer span.End()

    ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
    defer cancel()

    body, err := ipfs.postCtx(ctx, "id", "", nil)
    if err != nil {
        return api.IPFSID{}, err
    }

    var res ipfsIDResp
    err = json.Unmarshal(body, &res)
    if err != nil {
        return api.IPFSID{}, err
    }

    pID, err := peer.Decode(res.ID)
    if err != nil {
        return api.IPFSID{}, err
    }

    id := api.IPFSID{
        ID: pID,
    }

    mAddrs := make([]api.Multiaddr, len(res.Addresses))
    for i, strAddr := range res.Addresses {
        mAddr, err := api.NewMultiaddr(strAddr)
        if err != nil {
            logger.Warningf("cannot parse IPFS multiaddress: %s (%w)... ignoring", strAddr, err)
            continue
        }
        mAddrs[i] = mAddr
    }
    id.Addresses = mAddrs
    return id, nil
}

func pinArgs(maxDepth api.PinDepth) string {
    q := url.Values{}
    switch {
    case maxDepth < 0:
        q.Set("recursive", "true")
    case maxDepth == 0:
        q.Set("recursive", "false")
    default:
        q.Set("recursive", "true")
        q.Set("max-depth", strconv.Itoa(int(maxDepth)))
    }
    return q.Encode()
}

// Pin performs a pin request against the configured IPFS
// daemon.
func (ipfs *Connector) Pin(ctx context.Context, pin api.Pin) error {
    ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Pin")
    defer span.End()

    hash := pin.Cid
    maxDepth := pin.MaxDepth

    pinStatus, err := ipfs.PinLsCid(ctx, pin)
    if err != nil {
        return err
    }

    if pinStatus.IsPinned(maxDepth) {
        logger.Debug("IPFS object is already pinned: ", hash)
        return nil
    }

    defer ipfs.updateInformerMetric(ctx)

    ctx, cancelRequest := context.WithCancel(ctx)
    defer cancelRequest()

    // If the pin has origins, tell ipfs to connect to a maximum of 10.
    bound := len(pin.Origins)
    if bound > 10 {
        bound = 10
    }
    for _, orig := range pin.Origins[0:bound] {
        // do it in the background, ignoring errors.
        go func(o string) {
            logger.Debugf("swarm-connect to origin before pinning: %s", o)
            _, err := ipfs.postCtx(
                ctx,
                fmt.Sprintf("swarm/connect?arg=%s", o),
                "",
                nil,
            )
            if err != nil {
                logger.Debug(err)
                return
            }
            logger.Debugf("swarm-connect success to origin: %s", o)
        }(url.QueryEscape(orig.String()))
    }

    // If we have a pin-update, and the old object
    // is pinned recursively, then do pin/update.
    // Otherwise do a normal pin.
    if from := pin.PinUpdate; from.Defined() {
        fromPin := api.PinWithOpts(from, pin.PinOptions)
        pinStatus, _ := ipfs.PinLsCid(ctx, fromPin)
        if pinStatus.IsPinned(-1) { // pinned recursively.
            // As a side note, if PinUpdate == pin.Cid, we are
            // somehow pinning an already pinned thing and we'd
            // better use update for that
            return ipfs.pinUpdate(ctx, from, pin.Cid)
        }
    }

    // Pin request and timeout if there is no progress
    outPins := make(chan int)
    go func() {
        var lastProgress int
        lastProgressTime := time.Now()

        ticker := time.NewTicker(ipfs.config.PinTimeout)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                if time.Since(lastProgressTime) > ipfs.config.PinTimeout {
                    // timeout request
                    cancelRequest()
                    return
                }
            case p := <-outPins:
                // ipfs will send status messages every second
                // or so but we need make sure there was
                // progress by looking at number of nodes
                // fetched.
                if p > lastProgress {
                    lastProgress = p
                    lastProgressTime = time.Now()
                }
            case <-ctx.Done():
                return
            }
        }
    }()

    stats.Record(ipfs.ctx, observations.PinsPinAdd.M(1))
    err = ipfs.pinProgress(ctx, hash, maxDepth, outPins)
    if err != nil {
        stats.Record(ipfs.ctx, observations.PinsPinAddError.M(1))
        return err
    }
    totalPins := atomic.AddInt64(&ipfs.ipfsPinCount, 1)
    stats.Record(ipfs.ctx, observations.PinsIpfsPins.M(totalPins))

    logger.Info("IPFS Pin request succeeded: ", hash)
    return nil
}

// pinProgress pins an item and sends fetched node's progress on a
// channel. Blocks until done or error. pinProgress will always close the out
// channel.  pinProgress will not block on sending to the channel if it is full.
func (ipfs *Connector) pinProgress(ctx context.Context, hash api.Cid, maxDepth api.PinDepth, out chan<- int) error {
    defer close(out)

    ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/pinsProgress")
    defer span.End()

    pinArgs := pinArgs(maxDepth)
    path := fmt.Sprintf("pin/add?arg=%s&%s&progress=true", hash, pinArgs)

    body, err := ipfs.postCtxStreamResponse(ctx, path, "", nil)
    if err != nil {
        return err
    }
    defer body.Close()

    dec := json.NewDecoder(body)
    for {
        var pins ipfsPinsResp
        if err := dec.Decode(&pins); err != nil {
            // If we canceled the request we should tell the user
            // (in case dec.Decode() exited cleanly with an EOF).
            select {
            case <-ctx.Done():
                return ctx.Err()
            default:
                if err == io.EOF {
                    return nil // clean exit. Pinned!
                }
                return err // error decoding
            }
        }

        select {
        case out <- pins.Progress:
        default:
        }
    }
}

func (ipfs *Connector) pinUpdate(ctx context.Context, from, to api.Cid) error {
    ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/pinUpdate")
    defer span.End()

    path := fmt.Sprintf("pin/update?arg=%s&arg=%s&unpin=false", from, to)
    _, err := ipfs.postCtx(ctx, path, "", nil)
    if err != nil {
        return err
    }
    totalPins := atomic.AddInt64(&ipfs.ipfsPinCount, 1)
    stats.Record(ipfs.ctx, observations.PinsIpfsPins.M(totalPins))
    logger.Infof("IPFS Pin Update request succeeded. %s -> %s (unpin=false)", from, to)
    return nil
}

// Unpin performs an unpin request against the configured IPFS
// daemon.
func (ipfs *Connector) Unpin(ctx context.Context, hash api.Cid) error {
    ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Unpin")
    defer span.End()

    if ipfs.config.UnpinDisable {
        return errors.New("ipfs unpinning is disallowed by configuration on this peer")
    }

    defer ipfs.updateInformerMetric(ctx)

    path := fmt.Sprintf("pin/rm?arg=%s", hash)

    ctx, cancel := context.WithTimeout(ctx, ipfs.config.UnpinTimeout)
    defer cancel()

    // We will call unpin in any case, if the CID is not pinned,
    // then we ignore the error (although this is a bit flaky).
    _, err := ipfs.postCtx(ctx, path, "", nil)
    if err != nil {
        ipfsErr, ok := err.(ipfsError)
        if !ok || ipfsErr.Message != ipfspinner.ErrNotPinned.Error() {
            return err
        }
        logger.Debug("IPFS object is already unpinned: ", hash)
        return nil
    }

    totalPins := atomic.AddInt64(&ipfs.ipfsPinCount, -1)
    stats.Record(ipfs.ctx, observations.PinsIpfsPins.M(totalPins))

    logger.Info("IPFS Unpin request succeeded:", hash)
    return nil
}

// PinLs performs a "pin ls --type typeFilter" request against the configured
// IPFS daemon and sends the results on the given channel. Returns when done.
func (ipfs *Connector) PinLs(ctx context.Context, typeFilters []string, out chan<- api.IPFSPinInfo) error {
    defer close(out)
    bodies := make([]io.ReadCloser, len(typeFilters))

    ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/PinLs")
    defer span.End()

    ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
    defer cancel()

    var err error
    var totalPinCount int64
    defer func() {
        if err != nil {
            atomic.StoreInt64(&ipfs.ipfsPinCount, totalPinCount)
            stats.Record(ipfs.ctx, observations.PinsIpfsPins.M(totalPinCount))
        }
    }()

nextFilter:
    for i, typeFilter := range typeFilters {
        // Post and read streaming response
        path := "pin/ls?stream=true&type=" + typeFilter
        bodies[i], err = ipfs.postCtxStreamResponse(ctx, path, "", nil)
        if err != nil {
            return err
        }
        defer bodies[i].Close()

        dec := json.NewDecoder(bodies[i])

        for {
            select {
            case <-ctx.Done():
                err = fmt.Errorf("aborting pin/ls operation: %w", ctx.Err())
                logger.Error(err)
                return err
            default:
            }

            var ipfsPin api.IPFSPinInfo
            err = dec.Decode(&ipfsPin)
            if err == io.EOF {
                break nextFilter
            }
            if err != nil {
                err = fmt.Errorf("error decoding ipfs pin: %w", err)
                return err
            }

            select {
            case <-ctx.Done():
                err = fmt.Errorf("aborting pin/ls operation: %w", ctx.Err())
                logger.Error(err)
                return err
            case out <- ipfsPin:
                totalPinCount++
            }
        }
    }

    return nil
}

// PinLsCid performs a "pin ls <hash>" request. It will use "type=recursive" or
// "type=direct" (or other) depending on the given pin's MaxDepth setting.
// It returns an api.IPFSPinStatus for that hash.
func (ipfs *Connector) PinLsCid(ctx context.Context, pin api.Pin) (api.IPFSPinStatus, error) {
    ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/PinLsCid")
    defer span.End()

    ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
    defer cancel()

    if !pin.Defined() {
        return api.IPFSPinStatusBug, errors.New("calling PinLsCid without a defined CID")
    }

    pinType := pin.MaxDepth.ToPinMode().String()
    lsPath := fmt.Sprintf("pin/ls?stream=true&arg=%s&type=%s", pin.Cid, pinType)
    body, err := ipfs.postCtxStreamResponse(ctx, lsPath, "", nil)
    if err != nil {
        if errors.Is(ipfsUnpinnedError{}, err) {
            return api.IPFSPinStatusUnpinned, nil
        }
        return api.IPFSPinStatusError, err
    }
    defer body.Close()

    var res api.IPFSPinInfo
    dec := json.NewDecoder(body)

    err = dec.Decode(&res)
    if err != nil {
        logger.Error("error parsing pin/ls?arg=cid response")
        return api.IPFSPinStatusError, err
    }

    return res.Type, nil
}

// ConnectSwarms requests the ipfs addresses of other peers and
// triggers ipfs swarm connect requests
func (ipfs *Connector) ConnectSwarms(ctx context.Context) error {
    ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/ConnectSwarms")
    defer span.End()

    ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
    defer cancel()

    in := make(chan struct{})
    close(in)
    out := make(chan api.ID)
    go func() {
        err := ipfs.rpcClient.Stream(
            ctx,
            "",
            "Cluster",
            "Peers",
            in,
            out,
        )
        if err != nil {
            logger.Error(err)
        }
    }()

    for id := range out {
        ipfsID := id.IPFS
        if id.Error != "" || ipfsID.Error != "" {
            continue
        }
        for _, addr := range ipfsID.Addresses {
            // This is a best effort attempt
            // We ignore errors which happens
            // when passing in a bunch of addresses
            _, err := ipfs.postCtx(
                ctx,
                fmt.Sprintf("swarm/connect?arg=%s", url.QueryEscape(addr.String())),
                "",
                nil,
            )
            if err != nil {
                logger.Debug(err)
                continue
            }
            logger.Debugf("ipfs successfully connected to %s", addr)
        }
    }
    return nil
}

// ConfigKey fetches the IPFS daemon configuration and retrieves the value for
// a given configuration key. For example, "Datastore/StorageMax" will return
// the value for StorageMax in the Datastore configuration object.
func (ipfs *Connector) ConfigKey(keypath string) (interface{}, error) {
    ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
    defer cancel()
    res, err := ipfs.postCtx(ctx, "config/show", "", nil)
    if err != nil {
        return nil, err
    }

    var cfg map[string]interface{}
    err = json.Unmarshal(res, &cfg)
    if err != nil {
        logger.Error(err)
        return nil, err
    }

    path := strings.SplitN(keypath, "/", 2)
    if len(path) == 0 {
        return nil, errors.New("cannot lookup without a path")
    }

    return getConfigValue(path, cfg)
}

func getConfigValue(path []string, cfg map[string]interface{}) (interface{}, error) {
    value, ok := cfg[path[0]]
    if !ok {
        return nil, errors.New("key not found in configuration")
    }

    if len(path) == 1 {
        return value, nil
    }

    switch v := value.(type) {
    case map[string]interface{}:
        return getConfigValue(path[1:], v)
    default:
        return nil, errors.New("invalid path")
    }
}

// RepoStat returns the DiskUsage and StorageMax repo/stat values from the
// ipfs daemon, in bytes, wrapped as an IPFSRepoStat object.
func (ipfs *Connector) RepoStat(ctx context.Context) (api.IPFSRepoStat, error) {
    ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/RepoStat")
    defer span.End()

    ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
    defer cancel()
    res, err := ipfs.postCtx(ctx, "repo/stat?size-only=true", "", nil)
    if err != nil {
        return api.IPFSRepoStat{}, err
    }

    var stats api.IPFSRepoStat
    err = json.Unmarshal(res, &stats)
    if err != nil {
        logger.Error(err)
        return api.IPFSRepoStat{}, err
    }
    return stats, nil
}

// RepoGC performs a garbage collection sweep on the cluster peer's IPFS repo.
func (ipfs *Connector) RepoGC(ctx context.Context) (api.RepoGC, error) {
    ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/RepoGC")
    defer span.End()

    ctx, cancel := context.WithTimeout(ctx, ipfs.config.RepoGCTimeout)
    defer cancel()

    body, err := ipfs.postCtxStreamResponse(ctx, "repo/gc?stream-errors=true", "", nil)
    if err != nil {
        return api.RepoGC{}, err
    }

    defer body.Close()

    dec := json.NewDecoder(body)
    repoGC := api.RepoGC{
        Keys: []api.IPFSRepoGC{},
    }
    for {
        resp := ipfsRepoGCResp{}

        if err := dec.Decode(&resp); err != nil {
            // If we canceled the request we should tell the user
            // (in case dec.Decode() exited cleanly with an EOF).
            select {
            case <-ctx.Done():
                return repoGC, ctx.Err()
            default:
                if err == io.EOF {
                    return repoGC, nil // clean exit
                }
                logger.Error(err)
                return repoGC, err // error decoding
            }
        }

        repoGC.Keys = append(repoGC.Keys, api.IPFSRepoGC{Key: api.NewCid(resp.Key), Error: resp.Error})
    }
}

// Resolve accepts ipfs or ipns path and resolves it into a cid
func (ipfs *Connector) Resolve(ctx context.Context, path string) (api.Cid, error) {
    ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Resolve")
    defer span.End()

    validPath, err := gopath.NewPath(path)
    if err != nil {
        validPath, err = gopath.NewPath("/ipfs/" + path)
        if err != nil {
            logger.Error("could not parse path: " + err.Error())
            return api.CidUndef, err
        }
    }
    immPath, err := gopath.NewImmutablePath(validPath)
    if err == nil && len(immPath.Segments()) == 2 { // no need to resolve
        return api.NewCid(immPath.RootCid()), nil
    }

    ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
    defer cancel()
    res, err := ipfs.postCtx(ctx, "resolve?arg="+url.QueryEscape(validPath.String()), "", nil)
    if err != nil {
        return api.CidUndef, err
    }

    var resp ipfsResolveResp
    err = json.Unmarshal(res, &resp)
    if err != nil {
        logger.Error("could not unmarshal response: " + err.Error())
        return api.CidUndef, err
    }

    respPath, err := gopath.NewPath(resp.Path)
    if err != nil {
        logger.Error("invalid path in response: " + err.Error())
        return api.CidUndef, err
    }

    respImmPath, err := gopath.NewImmutablePath(respPath)
    if err != nil {
        logger.Error("resolved path is mutable: " + err.Error())
        return api.CidUndef, err
    }
    return api.NewCid(respImmPath.RootCid()), nil
}

// SwarmPeers returns the peers currently connected to this ipfs daemon.
func (ipfs *Connector) SwarmPeers(ctx context.Context) ([]peer.ID, error) {
    ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/SwarmPeers")
    defer span.End()

    ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
    defer cancel()

    res, err := ipfs.postCtx(ctx, "swarm/peers", "", nil)
    if err != nil {
        return nil, err
    }
    var peersRaw ipfsSwarmPeersResp
    err = json.Unmarshal(res, &peersRaw)
    if err != nil {
        logger.Error(err)
        return nil, err
    }

    swarm := make([]peer.ID, len(peersRaw.Peers))
    for i, p := range peersRaw.Peers {
        pID, err := peer.Decode(p.Peer)
        if err != nil {
            logger.Error(err)
            return swarm, err
        }
        swarm[i] = pID
    }
    return swarm, nil
}

// chanDirectory implements the files.Directory interface
type chanDirectory struct {
    iterator files.DirIterator
}

// Close is a no-op and it is not used.
func (cd *chanDirectory) Close() error {
    return nil
}

// not implemented, I think not needed for multipart.
func (cd *chanDirectory) Size() (int64, error) {
    return 0, nil
}

func (cd *chanDirectory) Entries() files.DirIterator {
    return cd.iterator
}

// chanIterator implements the files.DirIterator interface.
type chanIterator struct {
    ctx    context.Context
    blocks <-chan api.NodeWithMeta

    current api.NodeWithMeta
    peeked  api.NodeWithMeta
    done    bool
    err     error

    seenMu sync.Mutex
    seen   map[string]int
}

func (ci *chanIterator) Name() string {
    if !ci.current.Cid.Defined() {
        return ""
    }
    return ci.current.Cid.String()
}

// return NewBytesFile.
// This function might and is actually called multiple times for the same node
// by the multifile Reader to send the multipart.
func (ci *chanIterator) Node() files.Node {
    if !ci.current.Cid.Defined() {
        return nil
    }
    logger.Debugf("it.Node(): %s", ci.current.Cid)
    return files.NewBytesFile(ci.current.Data)
}

// Seen returns whether we have seen a multihash. It keeps count so it will
// return true as many times as we have seen it.
func (ci *chanIterator) Seen(c api.Cid) bool {
    ci.seenMu.Lock()
    n, ok := ci.seen[string(c.Cid.Hash())]
    logger.Debugf("Seen(): %s, %d, %t", c, n, ok)
    if ok {
        if n == 1 {
            delete(ci.seen, string(c.Cid.Hash()))
        } else {
            ci.seen[string(c.Cid.Hash())] = n - 1
        }
    }
    ci.seenMu.Unlock()
    return ok
}

func (ci *chanIterator) Done() bool {
    return ci.done
}

// Peek reads one block from the channel but saves it so that Next also
// returns it.
func (ci *chanIterator) Peek() (api.NodeWithMeta, bool) {
    if ci.done {
        return api.NodeWithMeta{}, false
    }

    select {
    case <-ci.ctx.Done():
        return api.NodeWithMeta{}, false
    case next, ok := <-ci.blocks:
        if !ok {
            return api.NodeWithMeta{}, false
        }
        ci.peeked = next
        return next, true
    }
}

func (ci *chanIterator) Next() bool {
    if ci.done {
        return false
    }

    seeBlock := func(b api.NodeWithMeta) {
        ci.seenMu.Lock()
        ci.seen[string(b.Cid.Hash())]++
        ci.seenMu.Unlock()
        stats.Record(ci.ctx, observations.BlocksAdded.M(1))
        stats.Record(ci.ctx, observations.BlocksAddedSize.M(int64(len(b.Data))))

    }

    if ci.peeked.Cid.Defined() {
        ci.current = ci.peeked
        ci.peeked = api.NodeWithMeta{}
        seeBlock(ci.current)
        return true
    }
    select {
    case <-ci.ctx.Done():
        ci.done = true
        ci.err = ci.ctx.Err()
        return false
    case next, ok := <-ci.blocks:
        if !ok {
            ci.done = true
            return false
        }
        // Record that we have seen this block. This has to be done
        // here, not in Node() as Node() is called multiple times per
        // block received.
        logger.Debugf("it.Next() %s", next.Cid)
        ci.current = next
        seeBlock(ci.current)
        return true
    }
}

func (ci *chanIterator) Err() error {
    return ci.err
}

func blockPutQuery(prefix cid.Prefix) (url.Values, error) {
    q := make(url.Values, 3)

    codec := multicodec.Code(prefix.Codec).String()
    if codec == "" {
        return q, fmt.Errorf("cannot find name for the blocks' CID codec: %x", prefix.Codec)
    }

    mhType, ok := multihash.Codes[prefix.MhType]
    if !ok {
        return q, fmt.Errorf("cannot find name for the blocks' Multihash type: %x", prefix.MhType)
    }

    // From go-ipfs 0.13.0 format is deprecated and we use cid-codec
    q.Set("cid-codec", codec)
    q.Set("mhtype", mhType)
    q.Set("mhlen", strconv.Itoa(prefix.MhLength))
    q.Set("pin", "false")
    q.Set("allow-big-block", "true")
    return q, nil
}

// BlockStream performs a multipart request to block/put with the blocks
// received on the channel.
func (ipfs *Connector) BlockStream(ctx context.Context, blocks <-chan api.NodeWithMeta) error {
    ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/BlockStream")
    defer span.End()

    logger.Debug("streaming blocks to IPFS")
    defer ipfs.updateInformerMetric(ctx)

    it := &chanIterator{
        ctx:    ctx,
        blocks: blocks,
        seen:   make(map[string]int),
    }
    dir := &chanDirectory{
        iterator: it,
    }

    // We need to pick into the first block to know which Cid prefix we
    // are writing blocks with, so that ipfs calculates the expected
    // multihash (we select the function used). This means that all blocks
    // in a stream should use the same.
    peek, ok := it.Peek()
    if !ok {
        return errors.New("BlockStream: no blocks to peek in blocks channel")
    }

    q, err := blockPutQuery(peek.Cid.Prefix())
    if err != nil {
        return err
    }
    url := "block/put?" + q.Encode()

    // Now we stream the blocks to ipfs. In case of error, we return
    // directly, but leave a goroutine draining the channel until it is
    // closed, which should be soon after returning.
    stats.Record(ctx, observations.BlocksPut.M(1))
    multiFileR := files.NewMultiFileReader(dir, true, false)
    contentType := "multipart/form-data; boundary=" + multiFileR.Boundary()
    body, err := ipfs.postCtxStreamResponse(ctx, url, contentType, multiFileR)
    if err != nil {
        return err
    }
    defer body.Close()

    dec := json.NewDecoder(body)
    for {
        var res ipfsBlockPutResp
        err = dec.Decode(&res)
        if err == io.EOF {
            return nil
        }
        if err != nil {
            logger.Error(err)
            break
        }
        logger.Debugf("response block: %s", res.Key)
        if !it.Seen(res.Key) {
            logger.Warningf("blockPut response CID (%s) does not match the multihash of any blocks sent", res.Key)
        }
    }

    // keep draining blocks channel until closed.
    go func() {
        for range blocks {
        }
    }()

    if err != nil {
        stats.Record(ipfs.ctx, observations.BlocksAddedError.M(1))
    }
    return err
}

// BlockGet retrieves an ipfs block with the given cid
func (ipfs *Connector) BlockGet(ctx context.Context, c api.Cid) ([]byte, error) {
    ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/BlockGet")
    defer span.End()

    ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
    defer cancel()
    url := "block/get?arg=" + c.String()
    return ipfs.postCtx(ctx, url, "", nil)
}

// // FetchRefs asks IPFS to download blocks recursively to the given depth.
// // It discards the response, but waits until it completes.
// func (ipfs *Connector) FetchRefs(ctx context.Context, c api.Cid, maxDepth int) error {
//     ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.PinTimeout)
//     defer cancel()

//     q := url.Values{}
//     q.Set("recursive", "true")
//     q.Set("unique", "false") // same memory on IPFS side
//     q.Set("max-depth", fmt.Sprintf("%d", maxDepth))
//     q.Set("arg", c.String())

//     url := fmt.Sprintf("refs?%s", q.Encode())
//     err := ipfs.postDiscardBodyCtx(ctx, url)
//     if err != nil {
//         return err
//     }
//     logger.Debugf("refs for %s successfully fetched", c)
//     return nil
// }

// Returns true every updateMetricsMod-th time that we
// call this function.
func (ipfs *Connector) shouldUpdateMetric() bool {
    if ipfs.config.InformerTriggerInterval <= 0 {
        return false
    }
    curCount := atomic.AddUint64(&ipfs.updateMetricCount, 1)
    if curCount%uint64(ipfs.config.InformerTriggerInterval) == 0 {
        atomic.StoreUint64(&ipfs.updateMetricCount, 0)
        return true
    }
    return false
}

// Trigger a broadcast of the local informer metrics.
func (ipfs *Connector) updateInformerMetric(ctx context.Context) error {
    ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/updateInformerMetric")
    defer span.End()

    if !ipfs.shouldUpdateMetric() {
        return nil
    }

    err := ipfs.rpcClient.GoContext(
        ctx,
        "",
        "Cluster",
        "SendInformersMetrics",
        struct{}{},
        &struct{}{},
        nil,
    )
    if err != nil {
        logger.Error(err)
    }
    return err
}

// daemon API.
func (ipfs *Connector) apiURL() string {
    switch ipfs.nodeNetwork {
    case "unix":
        return "http+unix://ipfs/api/v0"
    default:
        return fmt.Sprintf("http://%s/api/v0", ipfs.nodeAddr)
    }
}

func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string, contentType string, postBody io.Reader) (*http.Response, error) {
    logger.Debugf("posting /%s", path)
    urlstr := fmt.Sprintf("%s/%s", apiURL, path)

    req, err := http.NewRequest("POST", urlstr, postBody)
    if err != nil {
        logger.Error("error creating POST request:", err)
        return nil, err
    }

    req.Header.Set("Content-Type", contentType)
    req = req.WithContext(ctx)

    // Rate limiter. If we have a number of failed requests,
    // then wait for a tick.
    if failed := ipfs.failedRequests.Load(); failed > 0 {
        select {
        case <-ipfs.reqRateLimitCh:
        case <-ctx.Done():
            return nil, ctx.Err()
        case <-ipfs.ctx.Done():
            return nil, ipfs.ctx.Err()
        }

    }

    res, err := ipfs.client.Do(req)
    if err != nil {
        // request error: ipfs was unreachable, record it.
        ipfs.failedRequests.Add(1)
        logger.Error("error posting to IPFS:", err)
    } else {
        ipfs.failedRequests.Store(0)
    }

    return res, err
}

// checkResponse tries to parse an error message on non StatusOK responses
// from ipfs.
func checkResponse(path string, res *http.Response) ([]byte, error) {
    if res.StatusCode == http.StatusOK {
        return nil, nil
    }

    body, err := io.ReadAll(res.Body)
    res.Body.Close()
    if err == nil {
        var ipfsErr ipfsError
        if err := json.Unmarshal(body, &ipfsErr); err == nil {
            ipfsErr.code = res.StatusCode
            ipfsErr.path = path
            return body, ipfsErr
        }
    }

    // No error response with useful message from ipfs
    return nil, fmt.Errorf(
        "IPFS request failed (is it running?) (%s). Code %d: %s",
        path,
        res.StatusCode,
        string(body))
}

// postCtxStreamResponse makes a POST request against the ipfs daemon, and
// returns the body reader after checking the request for errros.
func (ipfs *Connector) postCtxStreamResponse(ctx context.Context, path string, contentType string, postBody io.Reader) (io.ReadCloser, error) {
    res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, contentType, postBody)
    if err != nil {
        return nil, err
    }

    _, err = checkResponse(path, res)
    if err != nil {
        return nil, err
    }
    return res.Body, nil
}

// postCtx makes a POST request against
// the ipfs daemon, reads the full body of the response and
// returns it after checking for errors.
func (ipfs *Connector) postCtx(ctx context.Context, path string, contentType string, postBody io.Reader) ([]byte, error) {
    rdr, err := ipfs.postCtxStreamResponse(ctx, path, contentType, postBody)
    if err != nil {
        return nil, err
    }
    defer rdr.Close()

    body, err := io.ReadAll(rdr)
    if err != nil {
        logger.Errorf("error reading response body: %s", err)
        return nil, err
    }
    return body, nil
}