OCA/server-tools

View on GitHub
auth_brute_force/controllers/controllers.py

Summary

Maintainability
B
4 hrs
Test Coverage
# -*- encoding: utf-8 -*-
##############################################################################
#
#    Tracks Authentication Attempts and Prevents Brute-force Attacks module
#    Copyright (C) 2015-Today GRAP (http://www.grap.coop)
#    @author Sylvain LE GAL (https://twitter.com/legalsylvain)
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Affero General Public License as
#    published by the Free Software Foundation, either version 3 of the
#    License, or (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the GNU Affero General Public License
#    along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################

import logging

from openerp import fields, http, registry, SUPERUSER_ID
from openerp.http import request
from openerp.addons.web.controllers.main import Home, ensure_db

_logger = logging.getLogger(__name__)


class LoginController(Home):
    @http.route()
    def web_login(self, redirect=None, **kw):
        if request.httprequest.method == 'POST':
            ensure_db()
            remote = request.httprequest.remote_addr
            # Get registry and cursor
            config_obj = registry(request.session.db)['ir.config_parameter']
            attempt_obj = registry(
                request.session.db)['res.authentication.attempt']
            banned_remote_obj = registry(
                request.session.db)['res.banned.remote']
            cursor = attempt_obj.pool.cursor()

            # Get Settings
            max_attempts_qty = int(config_obj.search_read(
                cursor, SUPERUSER_ID,
                [('key', '=', 'auth_brute_force.max_attempt_qty')],
                ['value'])[0]['value'])

            environ_log = config_obj.search_read(
                cursor, SUPERUSER_ID,
                [('key', '=', 'auth_brute_force.environ_log')],
                ['value'])

            # Test if remote user is banned
            banned = banned_remote_obj.search(cursor, SUPERUSER_ID, [
                ('remote', '=', remote)])
            if banned:
                _logger.warning(
                    "Authentication tried from remote '%s'. The request has"
                    " been ignored because the remote has been banned after"
                    " %d attempts without success. Login tried : '%s'." % (
                        remote, max_attempts_qty, request.params['login']))
                request.params['password'] = ''

            else:
                # Try to authenticate
                result = request.session.authenticate(
                    request.session.db, request.params['login'],
                    request.params['password'])

            # Log attempt
            cursor.commit()

            environ = ''
            if environ_log:
                filter_value = environ_log[0]['value']
                filter_keys = [k.strip() for k in filter_value.split(',')]
                for key, value in request.httprequest.environ.items():
                    if key in filter_keys or filter_value == '*':
                        environ += '%s=%s\n' % (key, value)

            attempt_obj.create(cursor, SUPERUSER_ID, {
                'attempt_date': fields.Datetime.now(),
                'login': request.params['login'],
                'remote': remote,
                'environ': environ,
                'result': banned and 'banned' or (
                    result and 'successfull' or 'failed'),
            })
            cursor.commit()
            if not banned and not result:
                # Get last bad attempts quantity
                attempts_qty = len(attempt_obj.search_last_failed(
                    cursor, SUPERUSER_ID, remote))

                if max_attempts_qty <= attempts_qty:
                    # We ban the remote
                    _logger.warning(
                        "Authentication failed from remote '%s'. "
                        "The remote has been banned. Login tried : '%s'." % (
                            remote, request.params['login']))
                    banned_remote_obj.create(cursor, SUPERUSER_ID, {
                        'remote': remote,
                        'ban_date': fields.Datetime.now(),
                    })
                    cursor.commit()

                else:
                    _logger.warning(
                        "Authentication failed from remote '%s'."
                        " Login tried : '%s'. Attempt %d / %d." % (
                            remote, request.params['login'], attempts_qty,
                            max_attempts_qty))
            cursor.close()

        return super(LoginController, self).web_login(redirect=redirect, **kw)