OCA/server-tools

View on GitHub
users_ldap_push/models/res_users.py

Summary

Maintainability
A
2 hrs
Test Coverage
# -*- coding: utf-8 -*-
##############################################################################
#
#    This module copyright (C) 2015 Therp BV (<http://therp.nl>).
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Affero General Public License as
#    published by the Free Software Foundation, either version 3 of the
#    License, or (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
from openerp import _, models, fields, api, exceptions
import logging

_logger = logging.getLogger(__name__)

try:
    import ldap
    import ldap.modlist
except ImportError:
    _logger.debug('Can not `from ldap.filter import filter_format`.')


class ResUsers(models.Model):
    _inherit = 'res.users'

    ldap_entry_dn = fields.Char('LDAP DN', readonly=True)
    is_ldap_user = fields.Boolean(
        'LDAP user', compute='_compute_is_ldap_user', default=True)

    @api.model
    @api.returns('self', lambda record: record.id)
    def create(self, values):
        result = super(ResUsers, self).create(values)
        result.push_to_ldap(values)
        return result

    @api.multi
    def write(self, values):
        result = super(ResUsers, self).write(values)
        self.push_to_ldap(values)
        return result

    @api.multi
    def _push_to_ldap_possible(self, values):
        return bool(self._get_ldap_configuration())

    @api.multi
    def _get_ldap_configuration(self):
        self.ensure_one()
        return self.sudo().company_id.ldaps.filtered('create_ldap_entry')[:1]

    @api.multi
    def _get_ldap_values(self, values):
        self.ensure_one()
        conf = self._get_ldap_configuration()
        result = {}
        for mapping in conf.create_ldap_entry_field_mappings:
            field_name = mapping.field_id.name
            if field_name not in values or not values[field_name]:
                continue
            result[mapping.attribute] = [values[field_name].encode('utf-8')]
        if result:
            result['objectClass'] = conf.create_ldap_entry_objectclass\
                .encode('utf-8').split(',')
        return result

    @api.multi
    def _get_ldap_dn(self, values):
        self.ensure_one()
        conf = self._get_ldap_configuration()
        dn = conf.create_ldap_entry_field_mappings.filtered('use_for_dn')
        assert dn, 'No DN attribute mapping given!'
        assert self[dn.field_id.name], 'DN attribute empty!'
        return '%s=%s,%s' % (
            dn.attribute,
            ldap.dn.escape_dn_chars(self[dn.field_id.name].encode('utf-8')),
            conf.create_ldap_entry_base or conf.ldap_base)

    @api.multi
    def push_to_ldap(self, values):
        for this in self:
            if not values.get('is_ldap_user') and not this.is_ldap_user:
                continue
            if not this._push_to_ldap_possible(values):
                continue
            ldap_values = this._get_ldap_values(values)
            if not ldap_values:
                continue
            ldap_configuration = this._get_ldap_configuration()
            ldap_connection = ldap_configuration.connect(
                ldap_configuration.read()[0])
            ldap_connection.simple_bind_s(
                (ldap_configuration.ldap_binddn or '').encode('utf-8'),
                (ldap_configuration.ldap_password or '').encode('utf-8'))

            try:
                if not this.ldap_entry_dn:
                    this._push_to_ldap_create(
                        ldap_connection, ldap_configuration, values,
                        ldap_values)
                if this.ldap_entry_dn:
                    this._push_to_ldap_write(
                        ldap_connection, ldap_configuration, values,
                        ldap_values)
            except ldap.LDAPError as e:
                _logger.exception(e)
                raise exceptions.Warning(_('Error'), e.message)
            finally:
                ldap_connection.unbind_s()

    @api.multi
    def _push_to_ldap_create(self, ldap_connection, ldap_configuration, values,
                             ldap_values):
        self.ensure_one()
        dn = self._get_ldap_dn(values)
        ldap_connection.add_s(
            dn,
            ldap.modlist.addModlist(ldap_values))
        self.write({'ldap_entry_dn': dn})

    @api.multi
    def _push_to_ldap_write(self, ldap_connection, ldap_configuration, values,
                            ldap_values):
        self.ensure_one()
        dn = self.ldap_entry_dn.encode('utf-8')
        dn_mapping = ldap_configuration.create_ldap_entry_field_mappings\
            .filtered('use_for_dn')
        if dn_mapping.attribute in ldap_values:
            ldap_values.pop(dn_mapping.attribute)
        ldap_entry = ldap_connection.search_s(
            dn, ldap.SCOPE_BASE, '(objectClass=*)',
            map(lambda x: x.encode('utf-8'), ldap_values.keys()))
        assert ldap_entry, '%s not found!' % self.ldap_entry_dn
        ldap_entry = ldap_entry[0][1]
        ldap_connection.modify_s(
            dn,
            ldap.modlist.modifyModlist(ldap_entry, ldap_values))

    @api.one
    @api.depends('ldap_entry_dn')
    def _compute_is_ldap_user(self):
        self.is_ldap_user = bool(self.ldap_entry_dn)

    @api.one
    def _change_ldap_password(self, new_passwd, auth_dn=None,
                              auth_passwd=None):
        ldap_configuration = self.env.user.sudo()._get_ldap_configuration()
        ldap_connection = ldap_configuration.connect(
            ldap_configuration.read()[0])
        dn = auth_dn or ldap_configuration.ldap_binddn
        old_passwd = auth_passwd or ldap_configuration.ldap_password
        ldap_connection.simple_bind_s(
            dn.encode('utf-8'), old_passwd.encode('utf-8'))
        self.env['ir.model.access'].check('res.users', 'write')
        self.env.user.check_access_rule('write')
        try:
            ldap_connection.passwd_s(
                self.ldap_entry_dn, None, new_passwd.encode('utf-8'))
        except ldap.LDAPError, e:
            raise exceptions.Warning(_('Error'), e.message)
        finally:
            ldap_connection.unbind_s()
        return True

    @api.model
    def change_password(self, old_passwd, new_passwd):
        if self.env.user.is_ldap_user:
            return self.env.user._change_ldap_password(
                new_passwd, auth_dn=self.env.user.ldap_entry_dn,
                auth_passwd=old_passwd)
        return super(ResUsers, self).change_password(old_passwd, new_passwd)