edgewall/trac

View on GitHub
trac/versioncontrol/web_ui/util.py

Summary

Maintainability
D
2 days
Test Coverage
# -*- coding: utf-8 -*-
#
# Copyright (C) 2003-2023 Edgewall Software
# Copyright (C) 2003-2005 Jonas Borgström <jonas@edgewall.com>
# Copyright (C) 2005-2007 Christian Boos <cboos@edgewall.org>
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at https://trac.edgewall.org/wiki/TracLicense.
#
# This software consists of voluntary contributions made by many
# individuals. For the exact contribution history, see the revision
# history and logs, available at https://trac.edgewall.org/log/.
#
# Author: Jonas Borgström <jonas@edgewall.com>
#         Christian Boos <cboos@edgewall.org>

from tempfile import TemporaryFile
from zipfile import ZipFile, ZIP_DEFLATED

from trac.resource import ResourceNotFound
from trac.util import content_disposition, create_zipinfo
from trac.util.datefmt import http_date
from trac.util.html import tag
from trac.util.translation import tag_, _
from trac.versioncontrol.api import EmptyChangeset, NoSuchChangeset, \
                                    NoSuchNode
from trac.web.api import RequestDone

__all__ = ['content_closing', 'get_changes', 'get_path_links',
           'get_existing_node', 'get_allowed_node', 'make_log_graph',
           'render_zip']


class content_closing(object):

    def __init__(self, content):
        self.content = content

    def __enter__(self):
        return self.content

    def __exit__(self, *exc_info):
        if hasattr(self.content, 'close'):
            self.content.close()


def get_changes(repos, revs, log=None):
    changes = {}
    for rev in revs:
        if rev in changes:
            continue
        try:
            changeset = repos.get_changeset(rev)
        except NoSuchChangeset:
            changeset = EmptyChangeset(repos, rev)
            if log is not None:
                log.warning("Unable to get changeset [%s] in %s", rev,
                            repos.reponame or '(default)')
        changes[rev] = changeset
    return changes


def get_path_links(href, reponame, path, rev, order=None, desc=None):
    desc = desc or None
    links = [{'name': 'source:',
              'href': href.browser(rev=rev if reponame == '' else None,
                                   order=order, desc=desc)}]
    if reponame:
        links.append({
            'name': reponame,
            'href': href.browser(reponame, rev=rev, order=order, desc=desc)})
    partial_path = ''
    for part in [p for p in path.split('/') if p]:
        partial_path += part + '/'
        links.append({
            'name': part,
            'href': href.browser(reponame or None, partial_path, rev=rev,
                                 order=order, desc=desc)
            })
    return links


def get_existing_node(req, repos, path, rev):
    try:
        return repos.get_node(path, rev)
    except NoSuchNode as e:
        # TRANSLATOR: You can 'search' in the repository history... (link)
        search_a = tag.a(_("search"),
                         href=req.href.log(repos.reponame or None, path,
                                           rev=rev, mode='path_history'))
        raise ResourceNotFound(tag(
            tag.p(e, class_="message"),
            tag.p(tag_("You can %(search)s in the repository history to see "
                       "if that path existed but was later removed",
                       search=search_a))))


def get_allowed_node(repos, path, rev, perm):
    if repos is not None:
        try:
            node = repos.get_node(path, rev)
        except (NoSuchNode, NoSuchChangeset):
            return None
        if node.is_viewable(perm):
            return node


def make_log_graph(repos, revs):
    """Generate graph information for the given revisions.

    Returns a tuple `(threads, vertices, columns)`, where:

     * `threads`: List of paint command lists `[(type, column, line)]`, where
       `type` is either 0 for "move to" or 1 for "line to", and `column` and
       `line` are coordinates.
     * `vertices`: List of `(column, thread_index)` tuples, where the `i`th
       item specifies the column in which to draw the dot in line `i` and the
       corresponding thread.
     * `columns`: Maximum width of the graph.
    """
    threads = []
    vertices = []
    columns = 0
    revs = iter(revs)

    def add_edge(thread, column, line):
        if thread and thread[-1][:2] == [1, column] \
                and thread[-2][1] == column:
            thread[-1][2] = line
        else:
            thread.append([1, column, line])

    try:
        next_rev = next(revs)
        line = 0
        active = []
        active_thread = []
        while True:
            rev = next_rev
            if rev not in active:
                # Insert new head
                threads.append([[0, len(active), line]])
                active_thread.append(threads[-1])
                active.append(rev)

            columns = max(columns, len(active))
            column = active.index(rev)
            vertices.append((column, threads.index(active_thread[column])))

            next_rev = next(revs)  # Raises StopIteration when no more revs
            next_revs = active[:]
            parents = list(repos.parent_revs(rev))

            # Replace current item with parents not already present
            new_parents = [p for p in parents if p not in active]
            next_revs[column:column + 1] = new_parents

            # Add edges to parents
            for col, (r, thread) in enumerate(zip(active, active_thread)):
                if r in next_revs:
                    add_edge(thread, next_revs.index(r), line + 1)
                elif r == rev:
                    if new_parents:
                        parents.remove(new_parents[0])
                        parents.append(new_parents[0])
                    for parent in parents:
                        if parent != parents[0]:
                            thread.append([0, col, line])
                        add_edge(thread, next_revs.index(parent), line + 1)

            if not new_parents:
                del active_thread[column]
            else:
                base = len(threads)
                threads.extend([[0, column + 1 + i, line + 1]]
                               for i in range(len(new_parents) - 1))
                active_thread[column + 1:column + 1] = threads[base:]

            active = next_revs
            line += 1
    except StopIteration:
        pass
    return threads, vertices, columns


def render_zip(req, filename, repos, root_node, iter_nodes):
    """Send a ZIP file containing the data corresponding to the `nodes`
    iterable.

    :type root_node: `~trac.versioncontrol.api.Node`
    :param root_node: optional ancestor for all the *nodes*

    :param iter_nodes: callable taking the optional *root_node* as input
                       and generating the `~trac.versioncontrol.api.Node`
                       for which the content should be added into the zip.
    """
    req.send_response(200)
    req.send_header('Content-Type', 'application/zip')
    req.send_header('Content-Disposition',
                    content_disposition('inline', filename))
    if root_node:
        req.send_header('Last-Modified', http_date(root_node.last_modified))
        root_path = root_node.path.rstrip('/')
    else:
        root_path = ''
    if root_path:
        root_path += '/'
        root_name = root_node.name + '/'
    else:
        root_name = ''
    root_len = len(root_path)
    req.end_headers()

    def write_partial(fileobj, start):
        end = fileobj.tell()
        fileobj.seek(start, 0)
        remaining = end - start
        while remaining > 0:
            chunk = fileobj.read(min(remaining, 4096))
            req.write(chunk)
            remaining -= len(chunk)
        fileobj.seek(end, 0)
        return end

    pos = 0
    with TemporaryFile(prefix='trac-', suffix='.zip') as fileobj:
        with ZipFile(fileobj, 'w', ZIP_DEFLATED) as zipfile:
            for node in iter_nodes(root_node):
                if node is root_node:
                    continue
                path = node.path.strip('/')
                assert path.startswith(root_path)
                path = root_name + path[root_len:]
                kwargs = {'mtime': node.last_modified}
                data = None
                if node.isfile:
                    with content_closing(
                            node.get_processed_content(eol_hint='CRLF')) \
                            as content:
                        data = content.read()
                    props = node.get_properties()
                    # Subversion specific
                    if 'svn:special' in props and data.startswith('link '):
                        data = data[5:]
                        kwargs['symlink'] = True
                    if 'svn:executable' in props:
                        kwargs['executable'] = True
                elif node.isdir and path:
                    kwargs['dir'] = True
                    data = ''
                if data is not None:
                    zipfile.writestr(create_zipinfo(path, **kwargs), data)
                    pos = write_partial(fileobj, pos)
        write_partial(fileobj, pos)
    raise RequestDone