// The ipfs-cluster-service application.
package main

import (

    ipfslite "github.com/hsanjuan/ipfs-lite"
    ipfscluster "github.com/ipfs-cluster/ipfs-cluster"
    peer "github.com/libp2p/go-libp2p/core/peer"
    ma "github.com/multiformats/go-multiaddr"

    semver "github.com/blang/semver"
    dscrdt "github.com/ipfs/go-ds-crdt"
    logging "github.com/ipfs/go-log/v2"
    cli "github.com/urfave/cli"

// ProgramName of this application
const programName = "ipfs-cluster-service"

// flag defaults
const (
    defaultLogLevel  = "info"
    defaultConsensus = "crdt"
    defaultDatastore = "pebble"

const (
    stateCleanupPrompt           = "The peer state will be removed.  Existing pins may be lost."
    configurationOverwritePrompt = "The configuration file will be overwritten."

// We store a commit id here
var commit string

// Description provides a short summary of the functionality of this tool
var Description = fmt.Sprintf(`
%s runs an IPFS Cluster peer.

A peer participates in the cluster consensus, follows a distributed log
of pinning and unpinning requests and manages pinning operations to a
configured IPFS daemon.

This peer also provides an API for cluster management, an IPFS Proxy API which
forwards requests to IPFS and a number of components for internal communication
using LibP2P. This is a simplified view of the components:

             | ipfs-cluster-ctl |
                       | HTTP(s)
ipfs-cluster-service   |                           HTTP
+----------+--------+--v--+----------------------+      +-------------+
| RPC      | Peer 1 | API | IPFS Connector/Proxy +------> IPFS daemon |
+----^-----+--------+-----+----------------------+      +-------------+
     | libp2p
+----v-----+--------+-----+----------------------+      +-------------+
| RPC      | Peer 2 | API | IPFS Connector/Proxy +------> IPFS daemon |
+----^-----+--------+-----+----------------------+      +-------------+
+----v-----+--------+-----+----------------------+      +-------------+
| RPC      | Peer 3 | API | IPFS Connector/Proxy +------> IPFS daemon |
+----------+--------+-----+----------------------+      +-------------+

%s needs valid configuration and identity files to run.
These are independent from IPFS. The identity includes its own
libp2p key-pair. They can be initialized with "init" and their
default locations are ~/%s/%s
and ~/%s/%s.

For feedback, bug reports or any additional information, visit


Initial configuration:

$ ipfs-cluster-service init

Launch a cluster:

$ ipfs-cluster-service daemon

Launch a peer and join existing cluster:

$ ipfs-cluster-service daemon --bootstrap /ip4/

Customize logs using --loglevel flag. To customize component-level
logging pass a comma-separated list of component-identifer:log-level
pair or without identifier for overall loglevel. Valid loglevels
are critical, error, warning, notice, info and debug.

$ ipfs-cluster-service --loglevel info,cluster:debug,pintracker:debug daemon

var logger = logging.Logger("service")

// Default location for the configurations and data
var (
    // DefaultFolder is the name of the cluster folder
    DefaultFolder = ".ipfs-cluster"
    // DefaultPath is set on init() to $HOME/DefaultFolder
    // and holds all the ipfs-cluster data
    DefaultPath string
    // The name of the configuration file inside DefaultPath
    DefaultConfigFile = "service.json"
    // The name of the identity file inside DefaultPath
    DefaultIdentityFile = "identity.json"

var (
    configPath   string
    identityPath string

func init() {
    // Set build information.
    if build, err := semver.NewBuildVersion(commit); err == nil {
        version.Version.Build = []string{"git" + build}

    // We try guessing user's home from the HOME variable. This
    // allows HOME hacks for things like Snapcraft builds. HOME
    // should be set in all UNIX by the OS. Alternatively, we fall back to
    // usr.HomeDir (which should work on Windows etc.).
    home := os.Getenv("HOME")
    if home == "" {
        usr, err := user.Current()
        if err != nil {
            panic(fmt.Sprintf("cannot get current user: %s", err))
        home = usr.HomeDir

    DefaultPath = filepath.Join(home, DefaultFolder)

func out(m string, a ...interface{}) {
    fmt.Fprintf(os.Stderr, m, a...)

func checkErr(doing string, err error, args ...interface{}) {
    if err != nil {
        if len(args) > 0 {
            doing = fmt.Sprintf(doing, args...)
        out("error %s: %s\n", doing, err)
        err = locker.tryUnlock()
        if err != nil {
            out("error releasing execution lock: %s\n", err)

func main() {
    app := cli.NewApp()
    app.Name = programName
    app.Usage = "IPFS Cluster peer"
    app.Description = Description
    //app.Copyright = "© Protocol Labs, Inc."
    app.Version = version.Version.String()
    app.Flags = []cli.Flag{
            Name:   "config, c",
            Value:  DefaultPath,
            Usage:  "path to the configuration and data `FOLDER`",
            EnvVar: "IPFS_CLUSTER_PATH",
            Name:  "force, f",
            Usage: "forcefully proceed with some actions. i.e. overwriting configuration",
            Name:  "debug, d",
            Usage: "enable full debug logging (very verbose)",
            Name:   "loglevel, l",
            EnvVar: "IPFS_CLUSTER_LOG_LEVEL",
            Usage:  "set overall and component-wise log levels",

    app.Before = func(c *cli.Context) error {
        absPath, err := filepath.Abs(c.String("config"))
        if err != nil {
            return err

        configPath = filepath.Join(absPath, DefaultConfigFile)
        identityPath = filepath.Join(absPath, DefaultIdentityFile)

        err = setupLogLevel(c.Bool("debug"), c.String("loglevel"))
        if err != nil {
            return err
        locker = &lock{path: absPath}

        return nil

    app.Commands = []cli.Command{
            Name:  "init",
            Usage: "Creates a configuration and generates an identity",
            Description: fmt.Sprintf(`
This command will initialize a new %s configuration file and, if it
does already exist, generate a new %s for %s.

If the optional [source-url] is given, the generated configuration file
will refer to it. The source configuration will be fetched from its source
URL during the launch of the daemon. If not, a default standard configuration
file will be created.

In the latter case, a cluster secret will be generated as required
by %s. Alternatively, this secret can be manually
provided with --custom-secret (in which case it will be prompted), or
by setting the CLUSTER_SECRET environment variable.

The --consensus flag allows to select an alternative consensus components for
in the newly-generated configuration.

Note that the --force flag allows to overwrite an existing
configuration with default values. To generate a new identity, please
remove the %s file first and clean any Raft state.

By default, an empty peerstore file will be created too. Initial contents can
be provided with the --peers flag. Depending on the chosen consensus, the
"trusted_peers" list in the "crdt" configuration section and the
"init_peerset" list in the "raft" configuration section will be prefilled to
the peer IDs in the given multiaddresses.

            ArgsUsage: "[http-source-url]",
            Flags: []cli.Flag{
                    Name:  "consensus",
                    Usage: "select consensus: 'crdt' or 'raft'",
                    Value: defaultConsensus,
                    Name:  "datastore",
                    Usage: "select datastore: 'badger', 'badger3', 'leveldb' or 'pebble'",
                    Value: defaultDatastore,
                    Name:  "custom-secret, s",
                    Usage: "prompt for the cluster secret (when no source specified)",
                    Name:  "peers",
                    Usage: "comma-separated list of multiaddresses to init with (see help)",
                    Name:  "force, f",
                    Usage: "overwrite configuration without prompting",
                    Name:  "randomports",
                    Usage: "configure random ports to listen on instead of defaults",
            Action: func(c *cli.Context) error {
                consensus := c.String("consensus")
                switch consensus {
                case "raft", "crdt":
                    checkErr("choosing consensus", errors.New("flag value must be set to 'raft' or 'crdt'"))

                datastore := c.String("datastore")
                switch datastore {
                case "leveldb", "badger", "badger3", "pebble":
                    checkErr("choosing datastore", errors.New("flag value must be set to 'leveldb', 'badger', 'badger3' or 'pebble'"))

                cfgHelper := cmdutils.NewConfigHelper(configPath, identityPath, consensus, datastore)
                defer cfgHelper.Manager().Shutdown() // wait for saves

                configExists := false
                if _, err := os.Stat(configPath); !os.IsNotExist(err) {
                    configExists = true

                identityExists := false
                if _, err := os.Stat(identityPath); !os.IsNotExist(err) {
                    identityExists = true

                if configExists || identityExists {
                    // cluster might be running
                    // acquire lock for config folder
                    defer locker.tryUnlock()

                if configExists {
                    confirm := fmt.Sprintf(
                        "%s Continue? [y/n]:",

                    // --force allows override of the prompt
                    if !c.Bool("force") {
                        if !yesNoPrompt(confirm) {
                            return nil

                // Set url. If exists, it will be the only thing saved.
                cfgHelper.Manager().Source = c.Args().First()

                // Generate defaults for all registered components
                err := cfgHelper.Manager().Default()
                checkErr("generating default configuration", err)

                if c.Bool("randomports") {
                    cfgs := cfgHelper.Configs()

                    cfgs.Cluster.ListenAddr, err = cmdutils.RandomizePorts(cfgs.Cluster.ListenAddr)
                    checkErr("randomizing ports", err)
                    cfgs.Restapi.HTTPListenAddr, err = cmdutils.RandomizePorts(cfgs.Restapi.HTTPListenAddr)
                    checkErr("randomizing ports", err)
                    cfgs.Ipfsproxy.ListenAddr, err = cmdutils.RandomizePorts(cfgs.Ipfsproxy.ListenAddr)
                    checkErr("randomizing ports", err)
                    cfgs.Pinsvcapi.HTTPListenAddr, err = cmdutils.RandomizePorts(cfgs.Pinsvcapi.HTTPListenAddr)
                    checkErr("randomizing ports", err)
                err = cfgHelper.Manager().ApplyEnvVars()
                checkErr("applying environment variables to configuration", err)

                userSecret, userSecretDefined := userProvidedSecret(c.Bool("custom-secret") && !c.Args().Present())
                // Set user secret
                if userSecretDefined {
                    cfgHelper.Configs().Cluster.Secret = userSecret

                peersOpt := c.String("peers")
                var multiAddrs []ma.Multiaddr
                if peersOpt != "" {
                    addrs := strings.Split(peersOpt, ",")

                    for _, addr := range addrs {
                        addr = strings.TrimSpace(addr)
                        multiAddr, err := ma.NewMultiaddr(addr)
                        checkErr("parsing peer multiaddress: "+addr, err)
                        multiAddrs = append(multiAddrs, multiAddr)

                    peers := ipfscluster.PeersFromMultiaddrs(multiAddrs)
                    cfgHelper.Configs().Crdt.TrustAll = false
                    cfgHelper.Configs().Crdt.TrustedPeers = peers
                    cfgHelper.Configs().Raft.InitPeerset = peers

                // Save config. Creates the folder.
                // Sets BaseDir in components.
                checkErr("saving default configuration", cfgHelper.SaveConfigToDisk())
                out("configuration written to %s.\n", configPath)

                if !identityExists {
                    ident := cfgHelper.Identity()
                    err := ident.Default()
                    checkErr("generating an identity", err)

                    err = ident.ApplyEnvVars()
                    checkErr("applying environment variables to the identity", err)

                    err = cfgHelper.SaveIdentityToDisk()
                    checkErr("saving "+DefaultIdentityFile, err)
                    out("new identity written to %s\n", identityPath)

                // Initialize peerstore file - even if empty
                peerstorePath := cfgHelper.Configs().Cluster.GetPeerstorePath()
                peerManager := pstoremgr.New(context.Background(), nil, peerstorePath)
                addrInfos, err := peer.AddrInfosFromP2pAddrs(multiAddrs...)
                checkErr("getting AddrInfos from peer multiaddresses", err)
                err = peerManager.SavePeerstore(addrInfos)
                checkErr("saving peers to peerstore", err)
                if l := len(multiAddrs); l > 0 {
                    out("peerstore written to %s with %d entries.\n", peerstorePath, len(multiAddrs))
                } else {
                    out("new empty peerstore written to %s.\n", peerstorePath)

                return nil
            Name:  "daemon",
            Usage: "Runs the IPFS Cluster peer (default)",
            Flags: []cli.Flag{
                    Name:  "upgrade, u",
                    Usage: "run state migrations before starting (deprecated/unused)",
                    Name:  "bootstrap, j",
                    Usage: "join a cluster providing a comma-separated list of existing peers multiaddress(es)",
                    Name:   "leave, x",
                    Usage:  "remove peer from cluster on exit. Overrides \"leave_on_shutdown\"",
                    Hidden: true,
                    Name:  "stats",
                    Usage: "enable stats collection",
                    Name:  "tracing",
                    Usage: "enable tracing collection",
                    Name:  "no-trust",
                    Usage: "do not trust bootstrap peers (only for \"crdt\" consensus)",
            Action: daemon,
            Name:  "state",
            Usage: "Manages the peer's persistent state (pinset)",
            Subcommands: []cli.Command{
                    Name:  "crdt",
                    Usage: "CRDT-state commands",
                    Before: func(c *cli.Context) error {
                        // Load all the configurations and identity
                        cfgHelper, err := cmdutils.NewLoadedConfigHelper(configPath, identityPath)
                        cfgs := cfgHelper.Configs()
                        checkErr("loading configurations", err)
                        defer cfgHelper.Manager().Shutdown()

                        if cfgHelper.GetConsensus() != cfgs.Crdt.ConfigKey() {
                            checkErr("", errors.New("crdt subcommands can only be run on peers initialized with crdt consensus"))
                        return nil

                    Subcommands: []cli.Command{
                            Name:  "info",
                            Usage: "Print information about the CRDT store",
                            Description: `
This commands prints basic information: current heads, dirty flag etc.
                            Flags: []cli.Flag{},
                            Action: func(c *cli.Context) error {
                                defer locker.tryUnlock()

                                crdt := getCrdt()
                                info := crdt.InternalStats()
                                    "Number of heads: %d. Current max-height: %d. Dirty: %t\nHeads: %s",
                                return nil
                            Name:  "dot",
                            Usage: "Write the CRDT-DAG as DOT file",
                            Description: `
This command generates a DOT file representing the CRDT-DAG of this node.
The DOT file can then be visualized, converted to SVG etc.

This is a debugging command to visualize how the DAG looks like, whether there
is a lot of branching etc. large DAGs will generate large DOT files.
Use with caution!
                            Flags: []cli.Flag{
                                    Name:  "file, f",
                                    Value: "",
                                    Usage: "writes to file instead of stdout",
                            Action: func(c *cli.Context) error {
                                defer locker.tryUnlock()

                                crdt := getCrdt()

                                var err error
                                var w io.WriteCloser
                                outputPath := c.String("file")
                                if outputPath == "" {
                                    // Output to stdout
                                    w = os.Stdout
                                } else {
                                    // Create the export file
                                    w, err = os.Create(outputPath)
                                    checkErr("creating output file", err)

                                // 256KiB of buffer size.
                                buf := bufio.NewWriterSize(w, 1<<18)
                                defer buf.Flush()

                                logger.Info("initiating CDRT-DAG DOT file export. Export might take a long time on large graphs")
                                checkErr("generating graph", crdt.DotDAG(buf))
                                logger.Info("dot file ")
                                return nil

                            Name:  "mark-dirty",
                            Usage: "Marks the CRDT-store as dirty",
                            Description: `
Marking the CRDT store as dirty will force-run a Repair operation on the next
run (i.e. next time the cluster peer is started).
                            Flags: []cli.Flag{},
                            Action: func(c *cli.Context) error {
                                defer locker.tryUnlock()

                                crdt := getCrdt()
                                fmt.Println("Datastore marked 'dirty'")
                                return nil
                            Name:  "mark-clean",
                            Usage: "Marks the CRDT-store as clean",
                            Description: `
This command remove the dirty-mark on the CRDT-store, which means no
DAG operations will be run.
                            Flags: []cli.Flag{},
                            Action: func(c *cli.Context) error {
                                defer locker.tryUnlock()

                                crdt := getCrdt()
                                fmt.Println("Datastore marked 'clean'")
                                return nil

                    Name:  "export",
                    Usage: "save the state to a JSON file",
                    Description: `
This command dumps the current cluster pinset (state) as a JSON file. The
resulting file can be used to migrate, restore or backup a Cluster peer.
By default, the state will be printed to stdout.
                    Flags: []cli.Flag{
                            Name:  "file, f",
                            Value: "",
                            Usage: "writes to an output file",
                    Action: func(c *cli.Context) error {
                        defer locker.tryUnlock()

                        mgr := getStateManager()

                        var w io.WriteCloser
                        var err error
                        outputPath := c.String("file")
                        if outputPath == "" {
                            // Output to stdout
                            w = os.Stdout
                        } else {
                            // Create the export file
                            w, err = os.Create(outputPath)
                            checkErr("creating output file", err)

                        buf := bufio.NewWriter(w)
                        defer func() {
                        checkErr("exporting state", mgr.ExportState(buf))
                        logger.Info("state successfully exported")
                        return nil
                    Name:  "import",
                    Usage: "load the state from a file produced by 'export'",
                    Description: `
This command reads in an exported pinset (state) file and replaces the
existing one. This can be used, for example, to restore a Cluster peer from a

If an argument is provided, it will be treated it as the path of the file
to import. If no argument is provided, stdin will be used.
                    Flags: []cli.Flag{
                            Name:  "force, f",
                            Usage: "skips confirmation prompt",
                            Name:  "replication-min, rmin",
                            Value: 0,
                            Usage: "Overwrite replication-factor-min for all pins on import",
                            Name:  "replication-max, rmax",
                            Value: 0,
                            Usage: "Overwrite replication-factor-max for all pins on import",
                            Name:  "allocations, allocs",
                            Usage: "Overwrite allocations for all pins on import. Comma-separated list of peer IDs",
                    Action: func(c *cli.Context) error {
                        defer locker.tryUnlock()

                        confirm := "The pinset (state) of this peer "
                        confirm += "will be replaced. Continue? [y/n]:"
                        if !c.Bool("force") && !yesNoPrompt(confirm) {
                            return nil

                        // importState allows overwriting of some options on import
                        opts := api.PinOptions{
                            ReplicationFactorMin: c.Int("replication-min"),
                            ReplicationFactorMax: c.Int("replication-max"),
                            UserAllocations:      api.StringsToPeers(strings.Split(c.String("allocations"), ",")),

                        mgr := getStateManager()

                        // Get the importing file path
                        importFile := c.Args().First()
                        var r io.ReadCloser
                        var err error
                        if importFile == "" {
                            r = os.Stdin
                            fmt.Println("reading from stdin, Ctrl-D to finish")
                        } else {
                            r, err = os.Open(importFile)
                            checkErr("reading import file", err)
                        defer r.Close()

                        buf := bufio.NewReader(r)

                        checkErr("importing state", mgr.ImportState(buf, opts))
                        logger.Info("state successfully imported.  Make sure all peers have consistent states")
                        return nil
                    Name:  "cleanup",
                    Usage: "remove persistent data",
                    Description: `
This command removes any persisted consensus data in this peer, including the
current pinset (state). The next start of the peer will be like a first start
to all effects. Peers may need to bootstrap and sync from scratch after this.
                    Flags: []cli.Flag{
                            Name:  "force, f",
                            Usage: "skip confirmation prompt",
                    Action: func(c *cli.Context) error {
                        defer locker.tryUnlock()

                        confirm := fmt.Sprintf(
                            "%s Continue? [y/n]:",
                        if !c.Bool("force") && !yesNoPrompt(confirm) {
                            return nil

                        mgr := getStateManager()
                        checkErr("cleaning state", mgr.Clean())
                        logger.Info("data correctly cleaned up")
                        return nil
            Name:  "version",
            Usage: "Prints the ipfs-cluster version",
            Action: func(c *cli.Context) error {
                fmt.Printf("%s\n", version.Version)
                return nil

    app.Action = run


// run daemon() by default, or error.
func run(c *cli.Context) error {
    return nil

func setupLogLevel(debug bool, l string) error {
    // if debug is set to true, log everything in debug level
    if debug {
        ipfscluster.SetFacilityLogLevel("*", "DEBUG")
        return nil

    compLogLevel := strings.Split(l, ",")
    var logLevel string
    compLogFacs := make(map[string]string)
    // get overall log level and component-wise log levels from arguments
    for _, cll := range compLogLevel {
        if cll == "" {
        identifierToLevel := strings.Split(cll, ":")
        var lvl string
        var comp string
        switch len(identifierToLevel) {
        case 1:
            lvl = identifierToLevel[0]
            comp = "all"
        case 2:
            lvl = identifierToLevel[1]
            comp = identifierToLevel[0]
            return errors.New("log level not in expected format \"identifier:loglevel\" or \"loglevel\"")

        _, ok := compLogFacs[comp]
        if ok {
            fmt.Printf("overwriting existing %s log level\n", comp)
        compLogFacs[comp] = lvl

    logLevel, ok := compLogFacs["all"]
    if !ok {
        logLevel = defaultLogLevel
    } else {
        delete(compLogFacs, "all")

    // log service with logLevel
    ipfscluster.SetFacilityLogLevel("service", logLevel)

    logfacs := make(map[string]string)

    // fill component-wise log levels
    for identifier, level := range compLogFacs {
        logfacs[identifier] = level

    // Set the values for things not set by the user or for
    // things set by "all".
    for key := range ipfscluster.LoggingFacilities {
        if _, ok := logfacs[key]; !ok {
            logfacs[key] = logLevel

    // For Extra facilities, set the defaults per logging.go unless
    // manually set
    for key, defaultLvl := range ipfscluster.LoggingFacilitiesExtra {
        if _, ok := logfacs[key]; !ok {
            logfacs[key] = defaultLvl

    for identifier, level := range logfacs {
        ipfscluster.SetFacilityLogLevel(identifier, level)

    return nil

func userProvidedSecret(enterSecret bool) ([]byte, bool) {
    if enterSecret {
        secret := promptUser("Enter cluster secret (32-byte hex string): ")
        decodedSecret, err := ipfscluster.DecodeClusterSecret(secret)
        checkErr("parsing user-provided secret", err)
        return decodedSecret, true

    return nil, false

func promptUser(msg string) string {
    scanner := bufio.NewScanner(os.Stdin)
    return scanner.Text()

// Lifted from go-ipfs/cmd/ipfs/daemon.go
func yesNoPrompt(prompt string) bool {
    var s string
    for i := 0; i < 3; i++ {
        fmt.Printf("%s ", prompt)
        fmt.Scanf("%s", &s)
        switch s {
        case "y", "Y":
            return true
        case "n", "N":
            return false
        case "":
            return false
        fmt.Println("Please press either 'y' or 'n'")
    return false

func getStateManager() cmdutils.StateManager {
    cfgHelper, err := cmdutils.NewLoadedConfigHelper(
    checkErr("loading configurations", err)
    mgr, err := cmdutils.NewStateManagerWithHelper(cfgHelper)
    checkErr("creating state manager", err)
    return mgr

func getCrdt() *dscrdt.Datastore {
    // Load all the configurations and identity
    cfgHelper, err := cmdutils.NewLoadedConfigHelper(configPath, identityPath)
    checkErr("loading configurations", err)
    defer cfgHelper.Manager().Shutdown()

    // Get a state manager and the datastore
    mgr, err := cmdutils.NewStateManagerWithHelper(cfgHelper)
    checkErr("creating state manager", err)
    store, err := mgr.GetStore()
    checkErr("opening datastore", err)
    batching, ok := store.(datastore.Batching)
    if !ok {
        checkErr("", errors.New("no batching store"))

    crdtNs := cfgHelper.Configs().Crdt.DatastoreNamespace

    var blocksDatastore datastore.Batching = namespace.Wrap(

    ipfs, err := ipfslite.New(
            Offline: true,
    checkErr("creating ipfs-lite offline node", err)

    opts := dscrdt.DefaultOptions()
    opts.RepairInterval = 0
    crdt, err := dscrdt.New(
    checkErr("creating crdt node", err)
    return crdt