OCA/l10n-italy

View on GitHub
l10n_it_fatturapa_pec/models/mail_thread.py

Summary

Maintainability
C
1 day
Test Coverage
# -*- coding: utf-8 -*-
# Author(s): Andrea Colangelo (andreacolangelo@openforce.it)
# Copyright 2018 Openforce Srls Unipersonale (www.openforce.it)
# Copyright 2018 Sergio Corato (https://efatto.it)
# Copyright 2018 Lorenzo Battistini <https://github.com/eLBati>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import logging
import re
import base64
import zipfile
import io

from openerp import api, models, _
from openerp.exceptions import Warning as UserError

_logger = logging.getLogger(__name__)

FATTURAPA_IN_REGEX = '^(IT[a-zA-Z0-9]{11,16}|'\
                     '(?!IT)[A-Z]{2}[a-zA-Z0-9]{2,28})'\
                     '_[a-zA-Z0-9]{1,5}'\
                     '\\.(xml|XML|Xml|zip|ZIP|Zip|p7m|P7M|P7m)'\
                     '(\\.(p7m|P7M|P7m))?$'
RESPONSE_MAIL_REGEX = '(IT[a-zA-Z0-9]{11,16}|'\
                      '(?!IT)[A-Z]{2}[a-zA-Z0-9]{2,28})'\
                      '_[a-zA-Z0-9]{1,5}'\
                      '_MT_[a-zA-Z0-9]{,3}'

fatturapa_regex = re.compile(FATTURAPA_IN_REGEX)
response_regex = re.compile(RESPONSE_MAIL_REGEX)


class MailThread(models.AbstractModel):
    _inherit = 'mail.thread'

    def _create_message_attachments(self, message_dict):
        ir_attachment_obj = self.env['ir.attachment']
        attachment_ids = []
        for name, content in message_dict['attachments']:
            if isinstance(content, unicode):
                content = content.encode('utf-8')
            data_attach = {
                'name': name,
                'datas': base64.b64encode(str(content)),
                'datas_fname': name,
                'description': name,
                'res_model': message_dict.get('model', False),
                'res_id': message_dict.get('res_id', False),
            }
            attachment_ids.append(ir_attachment_obj.create(data_attach))
        return attachment_ids

    def clean_message_dict(self, message_dict):
        del message_dict['attachments']
        del message_dict['cc']
        del message_dict['from']
        del message_dict['to']

    @api.model
    def message_route(self, message, message_dict, model=None, thread_id=None,
                      custom_values=None):
        if any("@pec.fatturapa.it" in x for x in [
            message.get('Reply-To', ''),
            message.get('From', ''),
            message.get('Return-Path', '')]
        ):
            _logger.info("Processing FatturaPA PEC with Message-Id: "
                         "{}".format(message.get('Message-Id')))
            fatturapa_attachments = [x for x in message_dict['attachments']
                                     if fatturapa_regex.match(x[0])]
            response_attachments = [x for x in message_dict['attachments']
                                    if response_regex.match(x[0])]
            if response_attachments and fatturapa_attachments:
                return self.manage_pec_fe_attachments(
                    message, message_dict, response_attachments)
            else:
                return self.manage_pec_sdi_notification(message, message_dict)

        elif self._context.get('fetchmail_server_id', False):
            # This is not an email coming from SDI
            fetchmail_server = self.env['fetchmail.server'].browse(
                self._context['fetchmail_server_id'])
            if fetchmail_server.is_fatturapa_pec:
                att = self.find_attachment_by_subject(message_dict['subject'])
                if att:
                    return self.manage_pec_sdi_response(att, message_dict)
                raise UserError(_(
                    "PEC message with Message-Id %s has been read "
                    "but not processed, as not related to an "
                    "e-invoice.\n"
                    "Please check PEC mailbox %s, at server %s,"
                    " with user %s."
                ) % (
                    message.get('Message-Id'),
                    fetchmail_server.name, fetchmail_server.server,
                    fetchmail_server.user
                ))
        return super(MailThread, self).message_route(
            message, message_dict, model=model, thread_id=thread_id,
            custom_values=custom_values)

    def manage_pec_sdi_response(self, att, message_dict):
        # This is a PEC response (CONSEGNA o ACCETTAZIONE)
        # related to a message sent to SDI by us
        message_dict['model'] = 'fatturapa.attachment.out'
        message_dict['res_id'] = att.id
        self.clean_message_dict(message_dict)
        self.env['mail.message'].with_context(
            message_create_from_mail_mail=True).create(
            message_dict)
        return []

    def manage_pec_sdi_notification(self, message, message_dict):
        # this is an SDI notification
        message_dict = self.env['fatturapa.attachment.out'] \
            .parse_pec_response(message_dict)
        message_dict['record_name'] = message_dict['subject']
        attachment_ids = self._create_message_attachments(message_dict)
        message_dict['attachment_ids'] = attachment_ids
        self.clean_message_dict(message_dict)
        # message_create_from_mail_mail to avoid to notify message
        # (see mail.message.create)
        self.env['mail.message'].with_context(
            message_create_from_mail_mail=True).create(message_dict)
        _logger.info('Routing FatturaPA PEC E-Mail with Message-Id: {}'
                     .format(message.get('Message-Id')))
        return []

    def manage_pec_fe_attachments(self, message, message_dict,
                                  response_attachments):
        # this is an electronic invoice
        if len(response_attachments) > 1:
            _logger.info(
                'More than 1 message found in mail of incoming invoice')
        message_dict['model'] = 'fatturapa.attachment.in'
        message_dict['record_name'] = message_dict['subject']
        message_dict['res_id'] = 0
        attachment_ids = self._create_message_attachments(message_dict)
        for attachment in attachment_ids:
            if fatturapa_regex.match(attachment.name):
                self.create_fatturapa_attachment_in(attachment)
        message_dict['attachment_ids'] = attachment_ids
        self.clean_message_dict(message_dict)
        # model and res_id are only needed by
        # _message_post_process_attachments: we don't attach to
        del message_dict['model']
        del message_dict['res_id']
        # message_create_from_mail_mail to avoid to notify message
        # (see mail.message.create)
        self.env['mail.message'].with_context(
            message_create_from_mail_mail=True).create(message_dict)
        _logger.info('Routing FatturaPA PEC E-Mail with Message-Id: {}'
                     .format(message.get('Message-Id')))
        return []

    def find_attachment_by_subject(self, subject):
        attachment_out_model = self.env['fatturapa.attachment.out']
        if 'CONSEGNA: ' in subject:
            att_name = subject.replace('CONSEGNA: ', '')
            fatturapa_attachment_out = attachment_out_model \
                .search([('datas_fname', '=', att_name)])
            if len(fatturapa_attachment_out) == 1:
                return fatturapa_attachment_out
        if 'ACCETTAZIONE: ' in subject:
            att_name = subject.replace('ACCETTAZIONE: ', '')
            fatturapa_attachment_out = attachment_out_model \
                .search([('datas_fname', '=', att_name)])
            if len(fatturapa_attachment_out) == 1:
                return fatturapa_attachment_out
        return attachment_out_model.browse()

    def create_fatturapa_attachment_in(self, attachment):
        decoded = base64.b64decode(attachment.datas)
        fatturapa_attachment_in = self.env['fatturapa.attachment.in']
        fetchmail_server_id = self.env.context.get('fetchmail_server_id')
        company_id = False
        e_invoice_user_id = False
        if fetchmail_server_id:
            sdi_chan = self.env['sdi.channel'].search([
                ('fetch_pec_server_id', '=', fetchmail_server_id)], limit=1)
            if sdi_chan:
                # See check_fetch_pec_server_id
                company_id = sdi_chan.company_id.id
                e_invoice_user_id = sdi_chan.company_id.e_invoice_user_id.id
        if e_invoice_user_id:
            fatturapa_attachment_in = fatturapa_attachment_in.sudo(
                e_invoice_user_id)
        if attachment.file_type == 'application/zip':
            with zipfile.ZipFile(io.BytesIO(decoded)) as zf:
                for file_name in zf.namelist():
                    inv_file = zf.open(file_name)
                    if fatturapa_regex.match(file_name):
                        # check if this invoice is already
                        # in other fatturapa.attachment.in
                        fatturapa_atts = fatturapa_attachment_in.search([
                            ('name', '=', file_name)])
                        if fatturapa_atts:
                            _logger.info("In invoice %s already processed"
                                         % fatturapa_atts.mapped('name'))
                        else:
                            fatturapa_attachment_in.create({
                                'name': file_name,
                                'datas_fname': file_name,
                                'datas': base64.encodestring(inv_file.read()),
                                'company_id': company_id,
                            })
        else:
            fatturapa_atts = fatturapa_attachment_in.search(
                [('name', '=', attachment.name)])
            if fatturapa_atts:
                _logger.info(
                    "Invoice xml already processed in %s"
                    % fatturapa_atts.mapped('name'))
            else:
                fatturapa_attachment_in.create({
                    'ir_attachment_id': attachment.id,
                    'company_id': company_id,
                })