gamunu/cassandra_snap

View on GitHub
cassandra_snap/main.py

Summary

Maintainability
B
6 hrs
Test Coverage
from __future__ import (absolute_import, print_function)

# From system
from collections import defaultdict
from fabric.api import env
import os.path
import logging
import sys

# From package
from .snapshotting import (BackupWorker, RestoreWorker,
                           Snapshot, SnapshotCollection)
from .utils import (add_s3_arguments, get_s3_connection_host)
from .utils import base_parser as _base_parser

env.use_ssh_config = True


def run_backup(args):
    if args.user:
        env.user = args.user

    if args.password:
        env.password = args.password

    if args.sshkey:
        env.key_filename = args.sshkey

    if args.sshport:
        env.port = args.sshport

    env.hosts = args.hosts.split(',')
    env.keyspaces = args.keyspaces.split(',') if args.keyspaces else None

    if args.new_snapshot:
        create_snapshot = True
    else:
        existing_snapshot = SnapshotCollection(
            args.aws_access_key_id,
            args.aws_secret_access_key,
            args.s3_base_path,
            args.s3_bucket_name,
            get_s3_connection_host(args.s3_bucket_region)
        ).get_snapshot_for(
            hosts=env.hosts,
            keyspaces=env.keyspaces,
            table=args.table
        )
        create_snapshot = existing_snapshot is None

    worker = BackupWorker(
        aws_access_key_id=args.aws_access_key_id,
        aws_secret_access_key=args.aws_secret_access_key,
        s3_bucket_region=args.s3_bucket_region,
        s3_ssenc=args.s3_ssenc,
        s3_connection_host=get_s3_connection_host(args.s3_bucket_region),
        cassandra_conf_path=args.cassandra_conf_path,
        nodetool_path=args.nodetool_path,
        cassandra_bin_dir=args.cassandra_bin_dir,
        cqlsh_user=args.cqlsh_user,
        cqlsh_password=args.cqlsh_password,
        backup_schema=args.backup_schema,
        buffer_size=args.buffer_size,
        use_sudo=args.use_sudo,
        connection_pool_size=args.connection_pool_size,
        exclude_tables=args.exclude_tables,
        reduced_redundancy=args.reduced_redundancy,
        rate_limit=args.rate_limit,
        quiet=args.quiet,
        nodetool_user=args.nodetool_user,
        nodetool_pass=args.nodetool_pass,
        nodetool_port=args.nodetool_port,
        nodetool_password_file=args.nodetool_password_file
    )

    if create_snapshot:
        logging.info("Make a new snapshot")
        snapshot = Snapshot(
            base_path=args.s3_base_path,
            s3_bucket=args.s3_bucket_name,
            hosts=env.hosts,
            keyspaces=env.keyspaces,
            table=args.table
        )
        worker.snapshot(snapshot)
    else:
        logging.info("Add incrementals to snapshot {!s}".format(
            existing_snapshot))
        worker.update_snapshot(existing_snapshot)


def list_backups(args):
    snapshots = SnapshotCollection(
        args.aws_access_key_id,
        args.aws_secret_access_key,
        args.s3_base_path,
        args.s3_bucket_name,
        get_s3_connection_host(args.s3_bucket_region)
    )
    path_snapshots = defaultdict(list)

    for snapshot in snapshots:
        dir_path = os.path.dirname(snapshot.base_path)
        path_snapshots[dir_path].append(snapshot)

    for path, snapshots in path_snapshots.iteritems():
        print("-----------[{!s}]-----------".format(path))
        for snapshot in snapshots:
            print("\t {!r} hosts:{!r} keyspaces:{!r} table:{!r}".format(
                snapshot, snapshot.hosts, snapshot.keyspaces, snapshot.table))
        print("------------------------{}".format('-' * len(path)))


def restore_backup(args):
    snapshots = SnapshotCollection(
        args.aws_access_key_id,
        args.aws_secret_access_key,
        args.s3_base_path,
        args.s3_bucket_name,
        get_s3_connection_host(args.s3_bucket_region),
    )
    if args.snapshot_name == 'LATEST':
        snapshot = snapshots.get_latest()
    else:
        snapshot = snapshots.get_snapshot_by_name(args.snapshot_name)

    worker = RestoreWorker(aws_access_key_id=args.aws_access_key_id,
                           aws_secret_access_key=args.aws_secret_access_key,
                           snapshot=snapshot,
                           cassandra_bin_dir=args.cassandra_bin_dir,
                           restore_dir=args.restore_dir,
                           no_sstableloader=args.no_sstableloader,
                           local_restore=args.local_restore)

    if args.hosts:
        hosts = args.hosts.split(',')

        if args.local_restore and len(hosts) > 1:
            logging.error(
                "You must provide only one source host when using --local.")
            sys.exit(1)
    else:
        hosts = snapshot.hosts

        if args.local_restore:
            logging.error(
                "You must provide one source host when using --local.")
            sys.exit(1)

    # --target_hosts is mutually exclusive with --local and --nosstableloader
    if args.target_hosts:
        target_hosts = args.target_hosts.split(',')
    else:
        # --local or --nosstableloader: no streaming will occur
        target_hosts = None

    worker.restore(args.keyspace, args.table, hosts, target_hosts)


def main():
    base_parser = add_s3_arguments(_base_parser)
    subparsers = base_parser.add_subparsers(
        title='subcommands', dest='subcommand'
    )

    subparsers.add_parser('list', help="List existing backups")

    backup_parser = subparsers.add_parser('backup', help="Create a snapshot")

    # snapshot / backup arguments
    backup_parser.add_argument(
        '--buffer-size',
        default=64,
        help="The buffer size (MB) for compress and upload")

    backup_parser.add_argument(
        '--exclude-tables',
        default='',
        help="Column families you want to skip")

    backup_parser.add_argument(
        '--hosts',
        required=True,
        help="Comma separated list of hosts to snapshot")

    backup_parser.add_argument(
        '--keyspaces',
        default='',
        help="Comma separated list of keyspaces to backup (omit to backup all)")

    backup_parser.add_argument(
        '--table',
        default='',
        help="The table (column family) to backup")

    backup_parser.add_argument(
        '--cassandra-conf-path',
        default='/etc/cassandra/conf/',
        help="cassandra config file path")

    backup_parser.add_argument(
        '--nodetool-path',
        default=None,
        help="nodetool path")

    backup_parser.add_argument(
        '--cassandra-bin-dir',
        default='/usr/bin',
        help="cassandra binaries directory")

    backup_parser.add_argument(
        '--user',
        help="The ssh user to logging on nodes")

    backup_parser.add_argument(
        '--use-sudo',
        default=False,
        help="Use sudo to run backup")

    backup_parser.add_argument(
        '--sshport',
        help="The ssh port to use to connect to the nodes")

    backup_parser.add_argument(
        '--password',
        default='',
        help="User password to connect with hosts")

    backup_parser.add_argument(
        '--sshkey',
        help="The file containing the private ssh key to use to connect with hosts")

    backup_parser.add_argument(
        '--new-snapshot',
        action='store_true',
        help="Create a new snapshot")

    backup_parser.add_argument(
        '--backup-schema',
        action='store_true',
        help="Backup (thrift) schema of selected keyspaces")

    backup_parser.add_argument(
        '--cqlsh-user',
        default='',
        help="User to use for cqlsh commands")

    backup_parser.add_argument(
        '--cqlsh-password',
        default='',
        help="Password to use for cqlsh commands")

    backup_parser.add_argument(
        '--connection-pool-size',
        default=12,
        help="Number of simultaneous connections to cassandra nodes")

    backup_parser.add_argument(
        '--reduced-redundancy',
        action='store_true',
        help="Use S3 reduced redundancy")

    backup_parser.add_argument(
        '--rate-limit',
        default=0,
        help="Limit the upload speed to S3 (by using 'pv'). Value expressed in kilobytes (*1024)")

    backup_parser.add_argument(
        '--nodetool-user',
        help="Username to use for nodetool"
    )

    backup_parser.add_argument(
        '--nodetool-pass',
        help="Password to use for nodetool"
    )

    backup_parser.add_argument(
        '--nodetool-password-file',
        help="Password file to use for nodetool"
    )

    backup_parser.add_argument(
        '--nodetool-port',
        help="Port to use for nodetool"
    )

    backup_parser.add_argument(
        '--quiet',
        action='store_true',
        help="Set pv in quiet mode when using --rate-limit. "
             "Useful when called by a script.")

    # restore snapshot arguments
    restore_parser = subparsers.add_parser(
        'restore', help="Restores a snapshot")

    restore_parser.add_argument(
        '--snapshot-name',
        default='LATEST',
        help="The name (date/time) \
            of the snapshot (and incrementals) to restore")

    restore_parser.add_argument(
        '--keyspace',
        required=True,
        help="The keyspace to restore")

    restore_parser.add_argument(
        '--table',
        default='',
        help="The table (column family) to restore; leave blank for all")

    restore_parser.add_argument(
        '--hosts',
        default='',
        help="Comma separated list of hosts to restore from; "
             "leave empty for all. Only one host allowed when using --local.")

    restore_parser.add_argument(
        '--cassandra-bin-dir',
        default='/usr/bin',
        help="cassandra binaries directory")

    restore_parser.add_argument(
        '--restore-dir',
        default='/tmp/restore_cassandra/',
        help="Directory where data will be downloaded. "
             "Existing data in this directory will be *ERASED*. "
             "If --target-hosts is passed, sstableloader will stream data "
             "from this directory.")

    restore_type = restore_parser.add_mutually_exclusive_group(required=True)

    restore_type.add_argument(
        '--target-hosts',
        help='The comma separated list of hosts to restore into')

    restore_type.add_argument(
        '--local',
        action='store_true',
        dest='local_restore',
        help='Do not run sstableloader when restoring. If set, files will '
             'just be downloaded and decompressed in --restore-dir.')

    restore_type.add_argument(
        '--no-sstableloader',
        action='store_true',
        help="Do not run sstableloader when restoring. "
             "If set, files will just be downloaded. Use it if you want to do "
             "some checks and then run sstableloader manually.")

    args = base_parser.parse_args()
    subcommand = args.subcommand

    if args.verbose:
        logging.basicConfig(level=logging.INFO, format='%(message)s')

    if subcommand == 'backup':
        run_backup(args)
    elif subcommand == 'list':
        list_backups(args)
    elif subcommand == 'restore':
        restore_backup(args)


if __name__ == '__main__':
    main()