package main

import (

    ipfscluster "github.com/ipfs-cluster/ipfs-cluster"
    cli "github.com/urfave/cli/v2"

func printFirstStart() {
No clusters configured yet!
If this is the first time you are running %s,
be sure to check out the usage documentation. Here are some
examples to get you going:

$ %s --help                     - general description and usage help
$ %s <clusterName> --help       - Help and subcommands for the <clusterName>'s follower peer
$ %s <clusterName> info --help  - Help for the "info" subcommand (same for others).
`, programName, programName, programName, programName)

func printNotInitialized(clusterName string) {
This cluster peer has not been initialized.

Try running "%s %s init <config-url>" first.
`, programName, clusterName)

func setLogLevels(lvl string) {
    for f := range ipfscluster.LoggingFacilities {
        ipfscluster.SetFacilityLogLevel(f, lvl)

    for f := range ipfscluster.LoggingFacilitiesExtra {
        ipfscluster.SetFacilityLogLevel(f, lvl)

// returns whether the config folder exists
func isInitialized(absPath string) bool {
    _, err := os.Stat(absPath)
    return err == nil

func listClustersCmd(c *cli.Context) error {
    absPath, _, _ := buildPaths(c, "")
    f, err := os.Open(absPath)
    if os.IsNotExist(err) {
        return nil
    if err != nil {
        return cli.Exit(err, 1)

    dirs, err := f.Readdir(-1)
    if err != nil {
        return cli.Exit(errors.Wrapf(err, "reading %s", absPath), 1)

    var filteredDirs []string
    for _, d := range dirs {
        if d.IsDir() {
            configPath := filepath.Join(absPath, d.Name(), DefaultConfigFile)
            if _, err := os.Stat(configPath); err == nil {
                filteredDirs = append(filteredDirs, d.Name())

    if len(filteredDirs) == 0 {
        return nil

    fmt.Printf("Configurations found for %d follower peers. For info and help, try running:\n\n", len(filteredDirs))
    for _, d := range filteredDirs {
        fmt.Printf("%s \"%s\"\n", programName, d)
    fmt.Printf("\nTip: \"%s --help\" for help and examples.\n", programName)

    return nil

func infoCmd(c *cli.Context) error {
    clusterName := c.String("clusterName")

    // Avoid pollution of the screen

    absPath, configPath, identityPath := buildPaths(c, clusterName)

    if !isInitialized(absPath) {
        return cli.Exit("", 1)

    cfgHelper, err := cmdutils.NewLoadedConfigHelper(configPath, identityPath)
    var url string
    if err != nil {
        if config.IsErrFetchingSource(err) {
            url = fmt.Sprintf(
                "failed retrieving configuration source (%s)",
            ipfsCfg := ipfshttp.Config{}
            cfgHelper.Configs().Ipfshttp = &ipfsCfg
        } else {
            return cli.Exit(errors.Wrapf(err, "reading the configurations in %s", absPath), 1)
    } else {
        url = fmt.Sprintf("Available (%s)", cfgHelper.Manager().Source)

    fmt.Printf("Information about follower peer for Cluster \"%s\":\n\n", clusterName)
    fmt.Printf("Config folder: %s\n", absPath)
    fmt.Printf("Config source URL: %s\n", url)

    ctx := context.Background()
    client, err := getClient(absPath, clusterName)
    if err != nil {
        return cli.Exit(errors.Wrap(err, "error creating client"), 1)
    _, err = client.Version(ctx)
    fmt.Printf("Cluster Peer online: %t\n", err == nil)

    // Either we loaded a valid config, or we are using a default. Worth
    // applying env vars in the second case.
    if err := cfgHelper.Configs().Ipfshttp.ApplyEnvVars(); err != nil {
        return cli.Exit(errors.Wrap(err, "applying environment variables to ipfshttp config"), 1)

    cfgHelper.Configs().Ipfshttp.ConnectSwarmsDelay = 0
    connector, err := ipfshttp.NewConnector(cfgHelper.Configs().Ipfshttp)
    if err == nil {
        _, err = connector.ID(ctx)
    fmt.Printf("IPFS peer online: %t\n", err == nil)

    if c.Command.Name == "" {
        fmt.Printf("Additional help:\n\n")
        return cli.ShowAppHelp(c)
    return nil

func initCmd(c *cli.Context) error {
    if !c.Args().Present() {
        return cli.Exit("configuration URL not provided", 1)
    cfgURL := c.Args().First()

    return initCluster(c, false, cfgURL)

func initCluster(c *cli.Context, ignoreReinit bool, cfgURL string) error {
    clusterName := c.String(clusterNameFlag)

    absPath, configPath, identityPath := buildPaths(c, clusterName)

    if isInitialized(absPath) {
        if ignoreReinit {
            fmt.Println("Configuration for this cluster already exists. Skipping initialization.")
            fmt.Printf("If you wish to re-initialize, simply delete %s\n\n", absPath)
            return nil
        cmdutils.ErrorOut("Configuration for this cluster already exists.\n")
        cmdutils.ErrorOut("Please delete %s if you wish to re-initialize.", absPath)
        return cli.Exit("", 1)

    gw := c.String("gateway")

    if !strings.HasPrefix(cfgURL, "http://") && !strings.HasPrefix(cfgURL, "https://") {
        fmt.Printf("%s will be assumed to be an DNSLink-powered address: /ipns/%s.\n", cfgURL, cfgURL)
        fmt.Printf("It will be resolved using the local IPFS daemon's gateway (%s).\n", gw)
        fmt.Println("If this is not the case, specify the full url starting with http:// or https://.")
        fmt.Println("(You can override the gateway URL by setting IPFS_GATEWAY)")
        cfgURL = fmt.Sprintf("http://%s/ipns/%s", gw, cfgURL)

    // Setting the datastore here is useless, as we initialize with remote
    // config and we will have an empty service.json with the source only.
    // That source will decide which datastore is actually used.
    cfgHelper := cmdutils.NewConfigHelper(configPath, identityPath, "crdt", "")
    cfgHelper.Manager().Source = cfgURL
    err := cfgHelper.Manager().Default()
    if err != nil {
        return cli.Exit(errors.Wrap(err, "error generating default config"), 1)

    ident := cfgHelper.Identity()
    err = ident.Default()
    if err != nil {
        return cli.Exit(errors.Wrap(err, "error generating identity"), 1)

    err = ident.ApplyEnvVars()
    if err != nil {
        return cli.Exit(errors.Wrap(err, "error applying environment variables to the identity"), 1)

    err = cfgHelper.SaveIdentityToDisk()
    if err != nil {
        return cli.Exit(errors.Wrapf(err, "error saving %s", identityPath), 1)
    fmt.Printf("Identity written to %s.\n", identityPath)

    err = cfgHelper.SaveConfigToDisk()
    if err != nil {
        return cli.Exit(errors.Wrapf(err, "saving %s", configPath), 1)

    fmt.Printf("Configuration written to %s.\n", configPath)
    fmt.Printf("Cluster \"%s\" follower peer initialized.\n\n", clusterName)
        "You can now use \"%s %s run\" to start a follower peer for this cluster.\n",
    fmt.Println("(Remember to start your IPFS daemon before)")
    return nil

func runCmd(c *cli.Context) error {
    clusterName := c.String(clusterNameFlag)

    if cfgURL := c.String("init"); cfgURL != "" {
        err := initCluster(c, true, cfgURL)
        if err != nil {
            return err

    absPath, configPath, identityPath := buildPaths(c, clusterName)

    if !isInitialized(absPath) {
        return cli.Exit("", 1)

    fmt.Printf("Starting the IPFS Cluster follower peer for \"%s\".\nCTRL-C to stop it.\n", clusterName)
    fmt.Println("Checking if IPFS is online (will wait for 2 minutes)...")
    ctxIpfs, cancelIpfs := context.WithTimeout(context.Background(), 2*time.Minute)
    defer cancelIpfs()
    err := cmdutils.WaitForIPFS(ctxIpfs)
    if err != nil {
        return cli.Exit("timed out waiting for IPFS to be available", 1)

    setLogLevels(logLevel) // set to "info" by default.
    // Avoid API logs polluting the screen everytime we
    // run some "list" command.
    ipfscluster.SetFacilityLogLevel("restapilog", "error")

    cfgHelper, err := cmdutils.NewLoadedConfigHelper(configPath, identityPath)
    if err != nil {
        return cli.Exit(errors.Wrapf(err, "reading the configurations in %s", absPath), 1)
    cfgs := cfgHelper.Configs()

    stmgr, err := cmdutils.NewStateManager(cfgHelper.GetConsensus(), cfgHelper.GetDatastore(), cfgHelper.Identity(), cfgs)
    if err != nil {
        return cli.Exit(errors.Wrap(err, "creating state manager"), 1)

    store, err := stmgr.GetStore()
    if err != nil {
        return cli.Exit(errors.Wrap(err, "creating datastore"), 1)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    host, pubsub, dht, err := ipfscluster.NewClusterHost(ctx, cfgHelper.Identity(), cfgs.Cluster, store)
    if err != nil {
        return cli.Exit(errors.Wrap(err, "error creating libp2p components"), 1)

    // Always run followers in follower mode.
    cfgs.Cluster.FollowerMode = true
    // Do not let trusted peers GC this peer
    // Defaults to Trusted otherwise.
    cfgs.Cluster.RPCPolicy["Cluster.RepoGCLocal"] = ipfscluster.RPCClosed

    // Discard API configurations and create our own (unix socket)
    apiCfg := rest.NewConfig()
    cfgs.Restapi = apiCfg
    _ = apiCfg.Default()
    listenSocket, err := socketAddress(absPath, clusterName)
    if err != nil {
        return cli.Exit(err, 1)
    apiCfg.HTTPListenAddr = []multiaddr.Multiaddr{listenSocket}
    // Allow customization via env vars
    err = apiCfg.ApplyEnvVars()
    if err != nil {
        return cli.Exit(errors.Wrap(err, "error applying environmental variables to restapi configuration"), 1)

    rest, err := rest.NewAPI(ctx, apiCfg)
    if err != nil {
        return cli.Exit(errors.Wrap(err, "creating REST API component"), 1)

    connector, err := ipfshttp.NewConnector(cfgs.Ipfshttp)
    if err != nil {
        return cli.Exit(errors.Wrap(err, "creating IPFS Connector component"), 1)

    var informers []ipfscluster.Informer
    if cfgHelper.Manager().IsLoadedFromJSON(config.Informer, cfgs.DiskInf.ConfigKey()) {
        diskInf, err := disk.NewInformer(cfgs.DiskInf)
        if err != nil {
            return cli.Exit(errors.Wrap(err, "creating disk informer"), 1)
        informers = append(informers, diskInf)
    if cfgHelper.Manager().IsLoadedFromJSON(config.Informer, cfgs.TagsInf.ConfigKey()) {
        tagsInf, err := tags.New(cfgs.TagsInf)
        if err != nil {
            return cli.Exit(errors.Wrap(err, "creating numpin informer"), 1)
        informers = append(informers, tagsInf)

    if cfgHelper.Manager().IsLoadedFromJSON(config.Informer, cfgs.PinQueueInf.ConfigKey()) {
        pinQueueInf, err := pinqueue.New(cfgs.PinQueueInf)
        if err != nil {
            return cli.Exit(errors.Wrap(err, "creating pinqueue informer"), 1)
        informers = append(informers, pinQueueInf)

    alloc, err := balanced.New(cfgs.BalancedAlloc)
    if err != nil {
        return cli.Exit(errors.Wrap(err, "creating metrics allocator"), 1)

    crdtcons, err := crdt.New(
    if err != nil {
        return cli.Exit(errors.Wrap(err, "creating CRDT component"), 1)

    tracker := stateless.New(cfgs.Statelesstracker, host.ID(), cfgs.Cluster.Peername, crdtcons.State)

    mon, err := pubsubmon.New(ctx, cfgs.Pubsubmon, pubsub, nil)
    if err != nil {
        return cli.Exit(errors.Wrap(err, "setting up PeerMonitor"), 1)

    // Hardcode disabled tracing and metrics to avoid mistakenly
    // exposing any user data.
    tracerCfg := observations.TracingConfig{}
    _ = tracerCfg.Default()
    tracerCfg.EnableTracing = false
    cfgs.Tracing = &tracerCfg
    tracer, err := observations.SetupTracing(&tracerCfg)
    if err != nil {
        return cli.Exit(errors.Wrap(err, "error setting up tracer"), 1)

    // This does nothing since we are not calling SetupMetrics anyways
    // But stays just to be explicit.
    metricsCfg := observations.MetricsConfig{}
    _ = metricsCfg.Default()
    metricsCfg.EnableStats = false
    cfgs.Metrics = &metricsCfg

    // We are going to run a cluster peer and should do an
    // oderly shutdown if we are interrupted: cancel default
    // signal handling and leave things to HandleSignals.

    cluster, err := ipfscluster.NewCluster(
    if err != nil {
        return cli.Exit(errors.Wrap(err, "error creating cluster peer"), 1)

    return cmdutils.HandleSignals(ctx, cancel, cluster, host, dht, store)

// List
func listCmd(c *cli.Context) error {
    clusterName := c.String("clusterName")

    absPath, configPath, identityPath := buildPaths(c, clusterName)
    if !isInitialized(absPath) {
        return cli.Exit("", 1)

    err := printStatusOnline(absPath, clusterName)
    if err == nil {
        return nil

    // There was an error. Try offline status
    apiErr, ok := err.(*api.Error)
    if ok && apiErr.Code != 0 {
        return cli.Exit(
                "The Peer API seems to be running but returned with code %d",
            ), 1)

    // We are on offline mode so we cannot rely on IPFS being
    // running and most probably our configuration is remote and
    // to be loaded from IPFS. Thus we need to find a different
    // way to decide whether to load badger/leveldb, and once we
    // know, do it with the default settings.
    hasLevelDB := false
    lDBCfg := &leveldb.Config{}
    levelDBInfo, err := os.Stat(lDBCfg.GetFolder())
    if err == nil && levelDBInfo.IsDir() {
        hasLevelDB = true

    hasBadger := false
    badgerCfg := &badger.Config{}
    badgerInfo, err := os.Stat(badgerCfg.GetFolder())
    if err == nil && badgerInfo.IsDir() {
        hasBadger = true

    if hasLevelDB && hasBadger {
        return cli.Exit(errors.Wrapf(err, "found both leveldb (%s) and badger (%s) folders: cannot determine which to use in offline mode", lDBCfg.GetFolder(), badgerCfg.GetFolder()), 1)

    // Since things were initialized, assume there is one at least.
    dstoreType := "leveldb"
    if hasBadger {
        dstoreType = "badger"
    cfgHelper := cmdutils.NewConfigHelper(configPath, identityPath, "crdt", dstoreType)
    cfgHelper.Manager().Shutdown() // not needed
    cfgHelper.Manager().Default() // we have a default crdt config with either leveldb or badger registered.

    err = printStatusOffline(cfgHelper)
    if err != nil {
        return cli.Exit(errors.Wrap(err, "error obtaining the pinset"), 1)

    return nil

func printStatusOnline(absPath, clusterName string) error {
    ctx := context.Background()
    client, err := getClient(absPath, clusterName)
    if err != nil {
        return cli.Exit(errors.Wrap(err, "error creating client"), 1)

    out := make(chan api.GlobalPinInfo, 1024)
    errCh := make(chan error, 1)

    go func() {
        defer close(errCh)
        errCh <- client.StatusAll(ctx, 0, true, out)

    var pid string
    for gpi := range out {
        if pid == "" { // do this once
            // PeerMap will only have one key
            for k := range gpi.PeerMap {
                pid = k
        pinInfo := gpi.PeerMap[pid]
        printPin(gpi.Cid, pinInfo.Status.String(), gpi.Name, pinInfo.Error)
    err = <-errCh
    return err

func printStatusOffline(cfgHelper *cmdutils.ConfigHelper) error {
    mgr, err := cmdutils.NewStateManagerWithHelper(cfgHelper)
    if err != nil {
        return err
    store, err := mgr.GetStore()
    if err != nil {
        return err
    defer store.Close()
    st, err := mgr.GetOfflineState(store)
    if err != nil {
        return err

    out := make(chan api.Pin, 1024)
    errCh := make(chan error, 1)
    go func() {
        defer close(errCh)
        errCh <- st.List(context.Background(), out)

    for pin := range out {
        printPin(pin.Cid, "offline", pin.Name, "")

    err = <-errCh
    return err

func printPin(c api.Cid, status, name, err string) {
    if err != "" {
        name = name + " (" + err + ")"
    fmt.Printf("%-20s %s %s\n", status, c, name)