cloudsmith-io/cloudsmith-cli

View on GitHub
cloudsmith_cli/cli/commands/policy/vulnerability.py

Summary

Maintainability
F
6 days
Test Coverage
A
96%
"""CLI/Commands - Import all vulnerability policy commands."""
import json

import click

from ....core.api import orgs as api
from ... import command, decorators, utils, validators
from ...exceptions import handle_api_exceptions
from ...utils import fmt_bool, fmt_datetime, maybe_spinner, maybe_truncate_string
from .command import policy


def print_vulnerability_policies(policies):
    headers = [
        "Name",
        "Description",
        "Min Severity",
        "Allow Unknown Severity",
        "Quarantine On Violation",
        "Package Query",
        "Created",
        "Updated",
        "Identifier",
    ]

    rows = [
        [
            click.style(policy["name"], fg="cyan"),
            click.style(policy["description"], fg="magenta"),
            click.style(policy["min_severity"], fg="yellow"),
            click.style(fmt_bool(policy["allow_unknown_severity"]), fg="yellow"),
            click.style(fmt_bool(policy["on_violation_quarantine"]), fg="yellow"),
            click.style(
                str(maybe_truncate_string(policy["package_query_string"])),
                fg="yellow",
            ),
            click.style(fmt_datetime(policy["created_at"]), fg="blue"),
            click.style(fmt_datetime(policy["updated_at"]), fg="blue"),
            click.style(policy["slug_perm"], fg="green"),
        ]
        for policy in policies
    ]

    click.echo()
    utils.pretty_print_table(headers, rows, title="Vulnerability Policies")
    click.echo()

    num_results = len(rows)
    list_suffix = "vulnerability polic%s" % ("y" if num_results == 1 else "ies")
    utils.pretty_print_list_info(num_results=num_results, suffix=list_suffix)


@policy.group(cls=command.AliasGroup, name="vulnerability", aliases=[])
@decorators.common_cli_config_options
@decorators.common_cli_output_options
@decorators.common_api_auth_options
@decorators.initialise_api
@click.pass_context
def vulnerability(*args, **kwargs):
    """
    Manage vulnerability policies for an organization.

    See the help for subcommands for more information on each.
    """


@vulnerability.command(name="list", aliases=["ls"])
@decorators.common_cli_config_options
@decorators.common_cli_list_options
@decorators.common_cli_output_options
@decorators.common_api_auth_options
@decorators.initialise_api
@click.argument(
    "owner", metavar="OWNER", callback=validators.validate_owner, required=True
)
@click.pass_context
def ls(ctx, opts, owner, page, page_size):
    """
    List vulnerability policies.

    This requires appropriate permissions for the owner (a member of the
    organisation and a valid API key).

    - OWNER: Specify the OWNER namespace (i.e. org)

      Example: 'your-org'

    Full CLI example:

      $ cloudsmith policy vulnerability list your-org
    """
    owner = owner[0]

    # Use stderr for messages if the output is something else (e.g.  # JSON)
    use_stderr = opts.output != "pretty"

    click.echo("Getting vulnerability policies ... ", nl=False, err=use_stderr)

    context_msg = "Failed to get package vulnerability policies!"
    with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg):
        with maybe_spinner(opts):
            policies, page_info = api.list_vulnerability_policies(
                owner=owner, page=page, page_size=page_size
            )

    click.secho("OK", fg="green", err=use_stderr)

    if utils.maybe_print_as_json(opts, policies, page_info):
        return

    print_vulnerability_policies(policies)


@vulnerability.command(aliases=["new"])
@decorators.common_cli_config_options
@decorators.common_cli_output_options
@decorators.common_api_auth_options
@decorators.initialise_api
@click.argument("owner", default=None, required=True)
@click.argument("policy_config_file", type=click.File("rb"), required=True)
@click.pass_context
def create(ctx, opts, owner, policy_config_file):
    """
    Create a new vulnerability policy in a namespace.

    - OWNER: Specify the OWNER namespace (i.e. user or org) where you want
      to create a vulnerability policy.

        Example: 'your-org'

    - POLICY_CONFIG_FILE: Config file specifying the settings for the
      vulnerability policy to be created.

        \b
        Example:
        {
          "name": "your-vulnerability-policy",
          "description": "your vulnerability policy description",
          "min_severity": "Critical",
          "package_query_string" : "format:python AND downloads:>50",
          "allow_unknown_severity": false,
          "quarantine_on_violation": true
        }

    Full CLI example:

      $ cloudsmith policy vulnerability create your-org policy-config-file.json
    """
    # Use stderr for messages if the output is something else (e.g. JSON)
    use_stderr = opts.output != "pretty"
    policy_config = json.load(policy_config_file)

    policy_name = policy_config.get("name", None)
    if policy_name is None:
        raise click.BadParameter(
            "Name is a required field for creating a vulnerability policy.",
            param="name",
        )

    click.secho(
        "Creating %(name)s vulnerability policy for the %(owner)s namespace ..."
        % {
            "name": click.style(policy_name, bold=True),
            "owner": click.style(owner, bold=True),
        },
        nl=False,
        err=use_stderr,
    )

    context_msg = "Failed to create the vulnerability policy!"
    with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg):
        with maybe_spinner(opts):
            policies = [api.create_vulnerability_policy(owner, policy_config)]

    click.secho("OK", fg="green", err=use_stderr)

    if utils.maybe_print_as_json(opts, policies):
        return

    print_vulnerability_policies(policies)


@vulnerability.command()
@decorators.common_cli_config_options
@decorators.common_cli_output_options
@decorators.common_api_auth_options
@decorators.initialise_api
@click.argument("owner", default=None, required=True)
@click.argument("identifier", default=None, required=True)
@click.argument("policy_config_file", type=click.File("rb"), required=True)
@click.pass_context
def update(ctx, opts, owner, identifier, policy_config_file):
    """
    Update a vulnerability policy.

    - OWNER: Specify the OWNER namespace (i.e. user or org) where you want
      to update a vulnerability policy.

        Example: 'your-org'

    - IDENTIFIER: Specify the vulnerability policy IDENTIFIER (i.e. slug_perm)
      for the vulnerability policy which you wish to update.

        Example: 'your-vulnerability-policy'

    - POLICY_CONFIG_FILE: Config file specifying the settings for the
      vulnerability policy to be updated.

        \b
        Example:
        {
          "name": "your-vulnerability-policy",
          "description": "your vulnerability policy description",
          "min_severity": "Critical",
          "package_query_string" : "format:python AND downloads:>50",
          "allow_unknown_severity": false,
          "quarantine_on_violation": true
        }

    Full CLI example:

      $ cloudsmith policy vulnerability update your-org your-vulnerability-policy policy-config-file.json
    """
    # Use stderr for message if the output is something else (e.g. JSON)
    use_stderr = opts.output != "pretty"

    policy_config = json.load(policy_config_file)

    click.secho(
        "Updating %(slug_perm)s vulnerability policy in the %(owner)s namespace ..."
        % {
            "slug_perm": click.style(identifier, bold=True),
            "owner": click.style(owner, bold=True),
        },
        nl=False,
        err=use_stderr,
    )

    context_msg = "Failed to update the vulnerability policy!"
    with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg):
        with maybe_spinner(opts):
            policies = [
                api.update_vulnerability_policy(owner, identifier, policy_config)
            ]

    click.secho("OK", fg="green", err=use_stderr)

    if utils.maybe_print_as_json(opts, policies):
        return

    print_vulnerability_policies(policies)


@vulnerability.command(aliases=["rm"])
@decorators.common_cli_config_options
@decorators.common_cli_output_options
@decorators.common_api_auth_options
@decorators.initialise_api
@click.argument("owner", metavar="OWNER")
@click.argument("identifier", default=None, required=True)
@click.option(
    "-y",
    "--yes",
    default=False,
    is_flag=True,
    help="Assume yes as default answer to questions (this is dangerous!)",
)
@click.pass_context
def delete(ctx, opts, owner, identifier, yes):
    """
    Delete a vulnerability policy from a namespace.

    - OWNER: Specify the OWNER namespace (i.e. org).

        Example: 'your-org'

    - IDENTIFIER: Specify the vulnerability policy IDENTIFIER (i.e. slug_perm)
      for the vulnerability policy which you wish to delete.

        Example: 'your-vulnerability-policy'

    Full CLI example:

      $ cloudsmith policy vulnerability delete your-org your-vulnerability-policy
    """

    delete_args = {
        "namespace": click.style(owner, bold=True),
        "slug_perm": click.style(identifier, bold=True),
    }

    prompt = (
        "delete the %(slug_perm)s vulnerability policy from the %(namespace)s namespace"
        % delete_args
    )

    if not utils.confirm_operation(prompt, assume_yes=yes):
        return

    click.secho(
        "Deleting %(slug_perm)s from the %(namespace)s namespace ... " % delete_args,
        nl=False,
    )

    context_msg = "Failed to delete the vulnerability policy!"
    with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg):
        with maybe_spinner(opts):
            api.delete_vulnerability_policy(owner=owner, slug_perm=identifier)

    click.secho("OK", fg="green")