Net-ng/kansha

View on GitHub
kansha/authentication/ldap/ldap_auth.py

Summary

Maintainability
B
5 hrs
Test Coverage
# -*- coding:utf-8 -*-
# --
# Copyright (c) 2012-2014 Net-ng.
# All rights reserved.
#
# This software is licensed under the BSD License, as described in
# the file LICENSE.txt, which you should have received as part of
# this distribution.
# --
from nagare import log

try:
    import ldap
except ImportError:
    ldap = None
import types


def toUTF8(v):
    if isinstance(v, unicode):
        return v.encode('utf-8')
    elif isinstance(v, (types.TupleType, types.ListType)):
        return [toUTF8(e) for e in v]
    elif isinstance(v, types.DictType):
        return dict([(toUTF8(k), toUTF8(v_)) for k, v_ in v.items()])
    else:
        return v


class LDAPAuth(object):
    def __init__(self, ldap_cfg):
        ldap_cfg = toUTF8(ldap_cfg)
        self.server = "ldap://%s:%s" % (ldap_cfg['host'], ldap_cfg['port'])
        self.users_base_dn = ldap_cfg['users_base_dn']

    def connect(self):
        """Connect to LDAP server

        Return:
            - a server connection
        """
        assert ldap, 'python_ldap not installed'
        return ldap.initialize(self.server)

    def get_user_dn(self, uid):
        raise NotImplementedError()

    def check_password(self, uid, password):
        """Check if the specified couple user/password is correct

        In:
           - ``uid`` -- the user id
           - ``password`` -- the user password
        Return:
            - True if password is checked
        """
        c = self.connect()
        dn = self.get_user_dn(uid)
        # Try to authenticate
        try:
            c.simple_bind_s(dn, password.encode('UTF-8'))
            return True
        except ldap.INVALID_CREDENTIALS:
            log.info("Bad credentials for DN %r" % dn)
        except ldap.SERVER_DOWN:
            log.critical("LDAP server down")
        finally:
            c.unbind()

    def get_profile(self, uid, password):
        raise NotImplementedError()


class NngLDAPAuth(LDAPAuth):
    def get_user_dn(self, uid):
        """Construct a user DN given an user id

        In:
            - ``uid`` -- the user id
        Return:
            - a string, the user DN
        """
        return 'uid=%s,%s' % (ldap.dn.escape_dn_chars(toUTF8(uid)), self.users_base_dn)

    def get_profile(self, uid, password):
        c = self.connect()
        ldap_result = c.search_s(self.get_user_dn(uid), ldap.SCOPE_BASE)[0][1]
        profile = {}
        profile['uid'] = ldap_result['uid'][0]
        profile['name'] = ldap_result['displayName'][0].decode('utf-8')
        profile['email'] = ldap_result['mail'][0]
        profile['picture'] = ldap_result['jpegPhoto'][0] if ('jpegPhoto' in ldap_result and
                                                             ldap_result['jpegPhoto']) else None
        c.unbind()
        return profile


class ADLDAPAuth(LDAPAuth):
    def connect(self):
        conn = super(ADLDAPAuth, self).connect()
        conn.set_option(ldap.OPT_REFERRALS, 0)
        conn.protocol_version = 3
        return conn

    def check_password(self, uid, password):
        """Check if the specified couple user/password is correct

        In:
           - ``uid`` -- the user id
           - ``password`` -- the user password
        Return:
            - True if password is checked
        """
        c = self.connect()
        # Try to authenticate
        try:
            c.simple_bind_s(uid, password)
            return True
        except ldap.INVALID_CREDENTIALS:
            log.info("Bad credentials for uid %r" % uid)
        except ldap.SERVER_DOWN:
            log.critical("LDAP server down")
        finally:
            c.unbind()

    def get_profile(self, uid, password):
        c = self.connect()
        c.simple_bind_s(uid, password)
        ldap_result = c.search_s(self.users_base_dn, ldap.SCOPE_SUBTREE,
                                 '(userPrincipalName=%s)' % ldap.dn.escape_dn_chars(toUTF8(uid)))[0][1]
        profile = {}
        profile['uid'] = ldap_result['sAMAccountName'][0]
        profile['name'] = ldap_result['displayName'][0].decode('utf-8')
        profile['email'] = ldap_result.get('mail', [''])[0]
        profile['picture'] = ldap_result['thumbnailPhoto'][0] if ('thumbnailPhoto' in ldap_result and
                                                                  ldap_result['thumbnailPhoto']) else None
        c.unbind()
        return profile