salt/runners/venafiapi.py
# -*- coding: utf-8 -*-
'''
Support for Venafi
Before using this module you need to register an account with Venafi, and
configure it in your ``master`` configuration file.
First, you need to add a placeholder to the ``master`` file. This is because
the module will not load unless it finds an ``api_key`` setting, valid or not.
Open up ``/etc/salt/master`` and add:
.. code-block:: yaml
venafi:
api_key: None
Then register your email address with Venafi using the following command:
.. code-block:: bash
salt-run venafi.register <youremail@yourdomain.com>
This command will not return an ``api_key`` to you; that will be sent to you
via email from Venafi. Once you have received that key, open up your ``master``
file and set the ``api_key`` to it:
.. code-block:: yaml
venafi:
api_key: abcdef01-2345-6789-abcd-ef0123456789
'''
from __future__ import absolute_import, print_function, unicode_literals
import logging
import os
import tempfile
try:
from M2Crypto import RSA
HAS_M2 = True
except ImportError:
HAS_M2 = False
try:
from Cryptodome.PublicKey import RSA
except ImportError:
from Crypto.PublicKey import RSA
# Import Salt libs
import salt.cache
import salt.syspaths as syspaths
import salt.utils.files
import salt.utils.json
import salt.utils.stringutils
from salt.exceptions import CommandExecutionError
# Import 3rd-party libs
from salt.ext import six
__virtualname__ = 'venafi'
log = logging.getLogger(__name__)
def __virtual__():
'''
Only load the module if venafi is installed
'''
if __opts__.get('venafi', {}).get('api_key'):
return __virtualname__
return False
def _base_url():
'''
Return the base_url
'''
return __opts__.get('venafi', {}).get(
'base_url', 'https://api.venafi.cloud/v1'
)
def _api_key():
'''
Return the API key
'''
return __opts__.get('venafi', {}).get('api_key', '')
def gen_key(minion_id, dns_name=None, zone='default', password=None):
'''
Generate and return an private_key. If a ``dns_name`` is passed in, the
private_key will be cached under that name. The type of key and the
parameters used to generate the key are based on the default certificate
use policy associated with the specified zone.
CLI Example:
.. code-block:: bash
salt-run venafi.gen_key <minion_id> [dns_name] [zone] [password]
'''
# Get the default certificate use policy associated with the zone
# so we can generate keys that conform with policy
# The /v1/zones/tag/{name} API call is a shortcut to get the zoneID
# directly from the name
qdata = __utils__['http.query'](
'{0}/zones/tag/{1}'.format(_base_url(), zone),
method='GET',
decode=True,
decode_type='json',
header_dict={
'tppl-api-key': _api_key(),
'Content-Type': 'application/json',
},
)
zone_id = qdata['dict']['id']
# the /v1/certificatepolicies?zoneId API call returns the default
# certificate use and certificate identity policies
qdata = __utils__['http.query'](
'{0}/certificatepolicies?zoneId={1}'.format(_base_url(), zone_id),
method='GET',
decode=True,
decode_type='json',
header_dict={
'tppl-api-key': _api_key(),
'Content-Type': 'application/json',
},
)
policies = qdata['dict']['certificatePolicies']
# Extract the key length and key type from the certificate use policy
# and generate the private key accordingly
for policy in policies:
if policy['certificatePolicyType'] == "CERTIFICATE_USE":
keyTypes = policy['keyTypes']
# in case multiple keytypes and key lengths are supported
# always use the first key type and key length
keygen_type = keyTypes[0]['keyType']
key_len = keyTypes[0]['keyLengths'][0]
if int(key_len) < 2048:
key_len = 2048
if keygen_type == "RSA":
if HAS_M2:
gen = RSA.gen_key(key_len, 65537)
private_key = gen.as_pem(cipher='des_ede3_cbc', callback=lambda x: six.b(password))
else:
gen = RSA.generate(bits=key_len)
private_key = gen.exportKey('PEM', password)
if dns_name is not None:
bank = 'venafi/domains'
cache = salt.cache.Cache(__opts__, syspaths.CACHE_DIR)
try:
data = cache.fetch(bank, dns_name)
data['private_key'] = private_key
data['minion_id'] = minion_id
except TypeError:
data = {'private_key': private_key,
'minion_id': minion_id}
cache.store(bank, dns_name, data)
return private_key
def gen_csr(
minion_id,
dns_name,
zone='default',
country=None,
state=None,
loc=None,
org=None,
org_unit=None,
password=None,
):
'''
Generate a csr using the host's private_key.
Analogous to:
.. code-block:: bash
VCert gencsr -cn [CN Value] -o "Beta Organization" -ou "Beta Group" \
-l "Palo Alto" -st "California" -c US
CLI Example:
.. code-block:: bash
salt-run venafi.gen_csr <minion_id> <dns_name>
'''
tmpdir = tempfile.mkdtemp()
os.chmod(tmpdir, 0o700)
bank = 'venafi/domains'
cache = salt.cache.Cache(__opts__, syspaths.CACHE_DIR)
data = cache.fetch(bank, dns_name)
if data is None:
data = {}
if 'private_key' not in data:
data['private_key'] = gen_key(minion_id, dns_name, zone, password)
tmppriv = '{0}/priv'.format(tmpdir)
tmpcsr = '{0}/csr'.format(tmpdir)
with salt.utils.files.fopen(tmppriv, 'w') as if_:
if_.write(salt.utils.stringutils.to_str(data['private_key']))
if country is None:
country = __opts__.get('venafi', {}).get('country')
if state is None:
state = __opts__.get('venafi', {}).get('state')
if loc is None:
loc = __opts__.get('venafi', {}).get('loc')
if org is None:
org = __opts__.get('venafi', {}).get('org')
if org_unit is None:
org_unit = __opts__.get('venafi', {}).get('org_unit')
subject = '/C={0}/ST={1}/L={2}/O={3}/OU={4}/CN={5}'.format(
country,
state,
loc,
org,
org_unit,
dns_name,
)
cmd = "openssl req -new -sha256 -key {0} -out {1} -subj '{2}'".format(
tmppriv,
tmpcsr,
subject
)
if password is not None:
cmd += ' -passin pass:{0}'.format(password)
output = __salt__['salt.cmd']('cmd.run', cmd)
if 'problems making Certificate Request' in output:
raise CommandExecutionError(
'There was a problem generating the CSR. Please ensure that you '
'have the following variables set either on the command line, or '
'in the venafi section of your master configuration file: '
'country, state, loc, org, org_unit'
)
with salt.utils.files.fopen(tmpcsr, 'r') as of_:
csr = salt.utils.stringutils.to_unicode(of_.read())
data['minion_id'] = minion_id
data['csr'] = csr
cache.store(bank, dns_name, data)
return csr
def request(
minion_id,
dns_name=None,
zone='default',
request_id=None,
country='US',
state='California',
loc='Palo Alto',
org='Beta Organization',
org_unit='Beta Group',
password=None,
zone_id=None,
):
'''
Request a new certificate
Uses the following command:
.. code-block:: bash
VCert enroll -z <zone> -k <api key> -cn <domain name>
CLI Example:
.. code-block:: bash
salt-run venafi.request <minion_id> <dns_name>
'''
if password is not None:
if password.startswith('sdb://'):
password = __salt__['sdb.get'](password)
if zone_id is None:
zone_id = __opts__.get('venafi', {}).get('zone_id')
if zone_id is None and zone is not None:
zone_id = get_zone_id(zone)
if zone_id is None:
raise CommandExecutionError(
'Either a zone or a zone_id must be passed in or '
'configured in the master file. This id can be retreived using '
'venafi.show_company <domain>'
)
private_key = gen_key(minion_id, dns_name, zone, password)
csr = gen_csr(
minion_id,
dns_name,
zone=zone,
country=country,
state=state,
loc=loc,
org=org,
org_unit=org_unit,
password=password,
)
pdata = salt.utils.json.dumps({
'zoneId': zone_id,
'certificateSigningRequest': csr,
})
qdata = __utils__['http.query'](
'{0}/certificaterequests'.format(_base_url()),
method='POST',
data=pdata,
decode=True,
decode_type='json',
header_dict={
'tppl-api-key': _api_key(),
'Content-Type': 'application/json',
},
)
request_id = qdata['dict']['certificateRequests'][0]['id']
ret = {
'request_id': request_id,
'private_key': private_key,
'csr': csr,
'zone': zone,
}
bank = 'venafi/domains'
cache = salt.cache.Cache(__opts__, syspaths.CACHE_DIR)
data = cache.fetch(bank, dns_name)
if data is None:
data = {}
data.update({
'minion_id': minion_id,
'request_id': request_id,
'private_key': private_key,
'zone': zone,
'csr': csr,
})
cache.store(bank, dns_name, data)
_id_map(minion_id, dns_name)
return ret
# Request and renew are the same, so far as this module is concerned
renew = request
def _id_map(minion_id, dns_name):
'''
Maintain a relationship between a minion and a dns name
'''
bank = 'venafi/minions'
cache = salt.cache.Cache(__opts__, syspaths.CACHE_DIR)
dns_names = cache.fetch(bank, minion_id)
if not isinstance(dns_names, list):
dns_names = []
if dns_name not in dns_names:
dns_names.append(dns_name)
cache.store(bank, minion_id, dns_names)
def register(email):
'''
Register a new user account
CLI Example:
.. code-block:: bash
salt-run venafi.register email@example.com
'''
data = __utils__['http.query'](
'{0}/useraccounts'.format(_base_url()),
method='POST',
data=salt.utils.json.dumps({
'username': email,
'userAccountType': 'API',
}),
status=True,
decode=True,
decode_type='json',
header_dict={
'Content-Type': 'application/json',
},
)
status = data['status']
if six.text_type(status).startswith('4') or six.text_type(status).startswith('5'):
raise CommandExecutionError(
'There was an API error: {0}'.format(data['error'])
)
return data.get('dict', {})
def show_company(domain):
'''
Show company information, especially the company id
CLI Example:
.. code-block:: bash
salt-run venafi.show_company example.com
'''
data = __utils__['http.query'](
'{0}/companies/domain/{1}'.format(_base_url(), domain),
status=True,
decode=True,
decode_type='json',
header_dict={
'tppl-api-key': _api_key(),
},
)
status = data['status']
if six.text_type(status).startswith('4') or six.text_type(status).startswith('5'):
raise CommandExecutionError(
'There was an API error: {0}'.format(data['error'])
)
return data.get('dict', {})
def show_csrs():
'''
Show certificate requests for this API key
CLI Example:
.. code-block:: bash
salt-run venafi.show_csrs
'''
data = __utils__['http.query'](
'{0}/certificaterequests'.format(_base_url()),
status=True,
decode=True,
decode_type='json',
header_dict={
'tppl-api-key': _api_key(),
},
)
status = data['status']
if six.text_type(status).startswith('4') or six.text_type(status).startswith('5'):
raise CommandExecutionError(
'There was an API error: {0}'.format(data['error'])
)
return data.get('dict', {})
def get_zone_id(zone_name):
'''
Get the zone ID for the given zone name
CLI Example:
.. code-block:: bash
salt-run venafi.get_zone_id default
'''
data = __utils__['http.query'](
'{0}/zones/tag/{1}'.format(_base_url(), zone_name),
status=True,
decode=True,
decode_type='json',
header_dict={
'tppl-api-key': _api_key(),
},
)
status = data['status']
if six.text_type(status).startswith('4') or six.text_type(status).startswith('5'):
raise CommandExecutionError(
'There was an API error: {0}'.format(data['error'])
)
return data['dict']['id']
def show_policies():
'''
Show zone details for the API key owner's company
CLI Example:
.. code-block:: bash
salt-run venafi.show_zones
'''
data = __utils__['http.query'](
'{0}/certificatepolicies'.format(_base_url()),
status=True,
decode=True,
decode_type='json',
header_dict={
'tppl-api-key': _api_key(),
},
)
status = data['status']
if six.text_type(status).startswith('4') or six.text_type(status).startswith('5'):
raise CommandExecutionError(
'There was an API error: {0}'.format(data['error'])
)
return data['dict']
def show_zones():
'''
Show zone details for the API key owner's company
CLI Example:
.. code-block:: bash
salt-run venafi.show_zones
'''
data = __utils__['http.query'](
'{0}/zones'.format(_base_url()),
status=True,
decode=True,
decode_type='json',
header_dict={
'tppl-api-key': _api_key(),
},
)
status = data['status']
if six.text_type(status).startswith('4') or six.text_type(status).startswith('5'):
raise CommandExecutionError(
'There was an API error: {0}'.format(data['error'])
)
return data['dict']
def show_cert(id_):
'''
Show certificate requests for this API key
CLI Example:
.. code-block:: bash
salt-run venafi.show_cert 01234567-89ab-cdef-0123-456789abcdef
'''
data = __utils__['http.query'](
'{0}/certificaterequests/{1}/certificate'.format(_base_url(), id_),
params={
'format': 'PEM',
'chainOrder': 'ROOT_FIRST'
},
status=True,
text=True,
header_dict={'tppl-api-key': _api_key()},
)
status = data['status']
if six.text_type(status).startswith('4') or six.text_type(status).startswith('5'):
raise CommandExecutionError(
'There was an API error: {0}'.format(data['error'])
)
data = data.get('body', '')
csr_data = __utils__['http.query'](
'{0}/certificaterequests/{1}'.format(_base_url(), id_),
status=True,
decode=True,
decode_type='json',
header_dict={'tppl-api-key': _api_key()},
)
status = csr_data['status']
if six.text_type(status).startswith('4') or six.text_type(status).startswith('5'):
raise CommandExecutionError(
'There was an API error: {0}'.format(csr_data['error'])
)
csr_data = csr_data.get('dict', {})
certs = _parse_certs(data)
dns_name = ''
for item in csr_data['certificateName'].split(','):
if item.startswith('cn='):
dns_name = item.split('=')[1]
#certs['CSR Data'] = csr_data
cache = salt.cache.Cache(__opts__, syspaths.CACHE_DIR)
domain_data = cache.fetch('venafi/domains', dns_name)
if domain_data is None:
domain_data = {}
certs['private_key'] = domain_data.get('private_key')
domain_data.update(certs)
cache.store('venafi/domains', dns_name, domain_data)
certs['request_id'] = id_
return certs
pickup = show_cert
def show_rsa(minion_id, dns_name):
'''
Show a private RSA key
CLI Example:
.. code-block:: bash
salt-run venafi.show_rsa myminion domain.example.com
'''
cache = salt.cache.Cache(__opts__, syspaths.CACHE_DIR)
bank = 'venafi/domains'
data = cache.fetch(
bank, dns_name
)
return data['private_key']
def list_domain_cache():
'''
List domains that have been cached
CLI Example:
.. code-block:: bash
salt-run venafi.list_domain_cache
'''
cache = salt.cache.Cache(__opts__, syspaths.CACHE_DIR)
return cache.list('venafi/domains')
def del_cached_domain(domains):
'''
Delete cached domains from the master
CLI Example:
.. code-block:: bash
salt-run venafi.del_cached_domain domain1.example.com,domain2.example.com
'''
cache = salt.cache.Cache(__opts__, syspaths.CACHE_DIR)
if isinstance(domains, six.string_types):
domains = domains.split(',')
if not isinstance(domains, list):
raise CommandExecutionError(
'You must pass in either a string containing one or more domains '
'separated by commas, or a list of single domain strings'
)
success = []
failed = []
for domain in domains:
try:
cache.flush('venafi/domains', domain)
success.append(domain)
except CommandExecutionError:
failed.append(domain)
return {'Succeeded': success, 'Failed': failed}
def _parse_certs(data):
cert_mode = False
cert = ''
certs = []
rsa_key = ''
for line in data.splitlines():
if not line.strip():
continue
if 'Successfully posted request' in line:
comps = line.split(' for ')
request_id = comps[-1].strip()
continue
if 'END CERTIFICATE' in line or 'END RSA private_key' in line:
if 'RSA' in line:
rsa_key = rsa_key + line
else:
cert = cert + line
certs.append(cert)
cert_mode = False
continue
if 'BEGIN CERTIFICATE' in line or 'BEGIN RSA private_key' in line:
if 'RSA' in line:
rsa_key = line + '\n'
else:
cert = line + '\n'
cert_mode = True
continue
if cert_mode is True:
cert = cert + line + '\n'
continue
rcert = certs.pop(0)
eecert = certs.pop(-1)
ret = {
'end_entity_certificate': eecert,
'private_key': rsa_key,
'root_certificate': rcert,
'intermediate_certificates': certs
}
return ret