AlexRogalskiy/java-patterns

View on GitHub
tilt_modules/namespace/Tiltfile

Summary

Maintainability
Test Coverage
# -*- mode: Python -*-

def namespace_yaml(name):
  """Returns YAML for a namespace

  Args:
    name: The namespace name. Currently not validated.

  Returns:
    The namespace YAML as a blob
  """

  return blob("""apiVersion: v1
kind: Namespace
metadata:
  name: %s
""" % name)

def namespace_create(name, allow_duplicates=False):
  """Creates a namespace in the current Kubernetes cluster.

  Args:
    name: The namespace name. Currently not validated.
    allow_duplicates: Whether or not k8s should be allowed to have two
      instances of the same resource. Useful if you have more than one
      Tiltfile trying to create the same namespace.
  """
  k8s_yaml(namespace_yaml(name), allow_duplicates=allow_duplicates)

def namespace_inject(x, ns):
  """Takes K8s yaml, sets its namespace to `ns`, and returns it as a blob.

  This modifies the yaml in two ways:
  1. Sets .metadata.namespace to `ns`
  2. Sets ..template.metadata.namespace to `ns`
     This ensures the namespace in, e.g., Deployment Pod Template Specs is
     set, but might have false positives if you have a CRD with some other
     element named 'template'.

  Args:
    x: K8s yaml. Either a filename (string) or the yaml itself (Blob)
    ns: The namespace to set the K8s objects to.

  Returns:
    Blob containing the K8s objects as yaml, with namespaces set to `ns`.
  """
  return _mutate_yaml(x, lambda o: _set_k8s_yaml_namespace(o, ns))


def _mutate_yaml(x, f):
  if type(x) == 'string':
    objects = read_yaml_stream(x)
  elif type(x) == 'blob':
    objects = decode_yaml_stream(x)
  else:
    fail('only takes string or blob, got: %s' % type(x))

  return encode_yaml_stream([f(o) for o in objects])

def _set_k8s_yaml_namespace(o, ns):
  if type(o) == 'dict' and type(o.get('metadata', None)) == 'dict':
    o['metadata']['namespace'] = ns
  else:
    fail('Cannot inject namespace. Object does not have an object field named metadata:\n%s' % encode_yaml(o))

  _set_template_namespace(o, ns)
  return o

def _set_template_namespace(o, ns):
  if type(o) == 'dict':
    for k, v in o.items():
      if k == 'template' and type(v) == 'dict' and type(v.get('metadata', None)) == 'dict':
        v['metadata']['namespace'] = ns
      if type(v) == 'dict' or type(v) == 'list':
        _set_template_namespace(v, ns)
  elif type(o) == 'list':
    for v in o:
      _set_template_namespace(v, ns)