akretion/storage

View on GitHub
storage_file/models/storage_file.py

Summary

Maintainability
A
25 mins
Test Coverage
# -*- coding: utf-8 -*-
# Copyright 2017 Akretion (http://www.akretion.com).
# @author Sébastien BEAU <sebastien.beau@akretion.com>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).

import base64
import hashlib
import logging
import mimetypes
import os
import re

from odoo import api, fields, models
from odoo.exceptions import UserError
from odoo.tools import human_size
from odoo.tools.translate import _

_logger = logging.getLogger(__name__)

try:
    from slugify import slugify
except ImportError:  # pragma: no cover
    _logger.debug("Cannot `import slugify`.")


class StorageFile(models.Model):
    _name = "storage.file"
    _description = "Storage File"

    name = fields.Char(required=True, index=True)
    backend_id = fields.Many2one(
        "storage.backend", "Storage", index=True, required=True
    )
    url = fields.Char(
        compute="_compute_url",
        compute_sudo=True,
        store=True,
        help="HTTP accessible path to the file",
    )
    relative_path = fields.Char(
        readonly=True, help="Relative location for backend"
    )
    file_size = fields.Integer("File Size")
    human_file_size = fields.Char(
        "Human File Size", compute="_compute_human_file_size", store=True
    )
    checksum = fields.Char("Checksum/SHA1", size=40, index=True, readonly=True)
    filename = fields.Char(
        "Filename without extension",
        compute="_compute_extract_filename",
        store=True,
    )
    extension = fields.Char(
        "Extension", compute="_compute_extract_filename", store=True
    )
    mimetype = fields.Char(
        "Mime Type", compute="_compute_extract_filename", store=True
    )
    data = fields.Binary(
        help="Datas",
        inverse="_inverse_data",
        compute="_compute_data",
        store=False,
    )
    to_delete = fields.Boolean()
    active = fields.Boolean(default=True)
    company_id = fields.Many2one(
        "res.company",
        "Company",
        default=lambda self: self.env.user.company_id.id,
    )
    file_type = fields.Selection([])

    _sql_constraints = [
        ("url_uniq", "unique(url)", "The url must be uniq"),
        (
            "path_uniq",
            "unique(relative_path, backend_id)",
            "The private path must be uniq per backend",
        ),
    ]

    def write(self, vals):
        if "data" in vals:
            for record in self:
                if record.data:
                    raise UserError(
                        _(
                            "File can not be updated,"
                            "remove it and create a new one"
                        )
                    )
        return super(StorageFile, self).write(vals)

    @api.depends("file_size")
    def _compute_human_file_size(self):
        for record in self:
            record.human_file_size = human_size(self.file_size)

    def _slugify_name_with_id(self):
        return u"{}{}".format(
            slugify(u"{}-{}".format(self.filename, self.id)), self.extension
        )

    def _build_relative_path(self, checksum):
        self.ensure_one()
        strategy = self.sudo().backend_id.filename_strategy
        if not strategy:
            raise UserError(
                _(
                    "The filename strategy is empty for the backend %s.\n"
                    "Please configure it"
                )
                % self.backend_id.name
            )
        if strategy == "hash":
            return checksum[:2] + "/" + checksum
        elif strategy == "name_with_id":
            return self._slugify_name_with_id()

    def _prepare_meta_for_file(self):
        bin_data = base64.b64decode(self.data)
        checksum = hashlib.sha1(bin_data).hexdigest()
        relative_path = self._build_relative_path(checksum)
        return {
            "checksum": checksum,
            "file_size": len(bin_data),
            "relative_path": relative_path,
        }

    def _inverse_data(self):
        for record in self:
            record.write(record._prepare_meta_for_file())
            record.backend_id.sudo()._add_b64_data(
                record.relative_path, record.data, mimetype=record.mimetype
            )

    def _compute_data(self):
        for rec in self:
            if self._context.get("bin_size"):
                rec.data = rec.file_size
            elif rec.relative_path:
                rec.data = rec.backend_id.sudo()._get_b64_data(
                    rec.relative_path
                )
            else:
                rec.data = None

    @api.depends(
        "backend_id.served_by", "backend_id.base_url", "relative_path"
    )
    def _compute_url(self):
        for record in self:
            if record.backend_id.served_by == "odoo":
                base_url = (
                    self.env["ir.config_parameter"]
                    .sudo()
                    .get_param("web.base.url")
                )
                record.url = u"{}/storage.file/{}".format(
                    base_url, record._slugify_name_with_id()
                )
            else:
                record.url = "{}/{}".format(
                    record.backend_id.base_url, record.relative_path
                )

    @api.depends("name")
    def _compute_extract_filename(self):
        for rec in self:
            rec.filename, rec.extension = os.path.splitext(rec.name)
            mime, enc = mimetypes.guess_type(rec.name)
            rec.mimetype = mime

    def unlink(self):
        if self._context.get("cleanning_storage_file"):
            super(StorageFile, self).unlink()
        else:
            self.write({"to_delete": True, "active": False})
        return True

    @api.model
    def _clean_storage_file(self):
        self._cr.execute(
            """SELECT id
            FROM storage_file
            WHERE to_delete=True FOR UPDATE"""
        )
        ids = [x[0] for x in self._cr.fetchall()]
        for st_file in self.browse(ids):
            st_file.backend_id.sudo()._delete(st_file.relative_path)
            st_file.with_context(cleanning_storage_file=True).unlink()
            st_file._cr.commit()

    @api.model
    def get_from_slug_name_with_id(self, slug_name_with_id):
        """
        Return a browse record from a string generated by the method
        _slugify_name_with_id
        :param slug_name_with_id:
        :return: a BrowseRecord (could be empty...)
        """
        # id is the last group of digit after '-'
        _id = re.findall(r"-([0-9]+)", slug_name_with_id)[-1:]
        if _id:
            _id = int(_id[0])
        return self.browse(_id)