usr/lib/python3/dist-packages/linuxmusterLinuxclient7/ldapHelper.py
import ldap, ldap.sasl, sys, getpass, subprocess
from linuxmusterLinuxclient7 import logging, constants, config, user, computer
_currentLdapConnection = None
def serverUrl():
"""
Returns the server URL
:return: The server URL
:rtype: str
"""
rc, networkConfig = config.network()
if not rc:
return False, None
serverHostname = networkConfig["serverHostname"]
return 'ldap://{0}'.format(serverHostname)
def baseDn():
"""
Returns the base DN
:return: The baseDN
:rtype: str
"""
rc, networkConfig = config.network()
if not rc:
return None
domain = networkConfig["domain"]
return "dc=" + domain.replace(".", ",dc=")
def conn():
"""
Returns the ldap connection object
:return: The ldap connection object
:rtype: ldap.ldapobject.LDAPObject
"""
global _currentLdapConnection
if _connect():
return _currentLdapConnection
return None
def searchOne(filter):
"""Searches the LDAP with a filter and returns the first found object
:param filter: A valid ldap filter
:type filter: str
:return: Tuple (success, ldap object as dict)
:rtype: tuple
"""
if conn() == None:
logging.error("Cannot talk to LDAP")
return False, None
try:
rawResult = conn().search_s(
baseDn(),
ldap.SCOPE_SUBTREE,
filter
)
except Exception as e:
logging.error("Error executing LDAP search!")
logging.exception(e)
return False, None
try:
result = {}
if len(rawResult) <= 0 or rawResult[0][0] == None:
logging.debug(f"Search \"{filter}\" did not return any objects")
return False, None
for k in rawResult[0][1]:
if rawResult[0][1][k] != None:
rawAttribute = rawResult[0][1][k]
try:
if len(rawAttribute) == 1:
result[k] = str(rawAttribute[0].decode())
elif len(rawAttribute) > 0:
result[k] = []
for rawItem in rawAttribute:
result[k].append(str(rawItem.decode()))
except UnicodeDecodeError:
continue
return True, result
except Exception as e:
logging.error("Error while reading ldap search results!")
logging.exception(e)
return False, None
def isObjectInGroup(objectDn, groupName):
"""
Check if a given object is in a given group
:param objectDn: The DN of the object
:type objectDn: str
:param groupName: The name of the group
:type groupName: str
:return: True if it is a member, False otherwise
:rtype: bool
"""
logging.debug("= Testing if object {0} is a member of group {1} =".format(objectDn, groupName))
rc, groupAdObject = searchOne("(&(member:1.2.840.113556.1.4.1941:={0})(sAMAccountName={1}))".format(objectDn, groupName))
logging.debug("=> Result: {} =".format(rc))
return rc
# --------------------
# - Helper functions -
# --------------------
def _connect():
global _currentLdapConnection
if not user.isInAD() and not (user.isRoot() or not computer.isInAD()):
logging.warning("Cannot perform LDAP search: User is not in AD!")
_currentLdapConnection = None
return False
if not _currentLdapConnection == None:
return True
try:
sasl_auth = ldap.sasl.sasl({} ,'GSSAPI')
_currentLdapConnection = ldap.initialize(serverUrl(), trace_level=0)
# TODO:
# conn.set_option(ldap.OPT_X_TLS_CACERTFILE, '/path/to/ca.pem')
# conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
# conn.start_tls_s()
_currentLdapConnection.set_option(ldap.OPT_REFERRALS,0)
_currentLdapConnection.protocol_version = ldap.VERSION3
_currentLdapConnection.sasl_interactive_bind_s("", sasl_auth)
except Exception as e:
_currentLdapConnection = None
logging.error("Cloud not bind to ldap!")
logging.exception(e)
return False
return True