trac/versioncontrol/web_ui/util.py
# -*- coding: utf-8 -*-
#
# Copyright (C) 2003-2023 Edgewall Software
# Copyright (C) 2003-2005 Jonas Borgström <jonas@edgewall.com>
# Copyright (C) 2005-2007 Christian Boos <cboos@edgewall.org>
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at https://trac.edgewall.org/wiki/TracLicense.
#
# This software consists of voluntary contributions made by many
# individuals. For the exact contribution history, see the revision
# history and logs, available at https://trac.edgewall.org/log/.
#
# Author: Jonas Borgström <jonas@edgewall.com>
# Christian Boos <cboos@edgewall.org>
from tempfile import TemporaryFile
from zipfile import ZipFile, ZIP_DEFLATED
from trac.resource import ResourceNotFound
from trac.util import content_disposition, create_zipinfo
from trac.util.datefmt import http_date
from trac.util.html import tag
from trac.util.translation import tag_, _
from trac.versioncontrol.api import EmptyChangeset, NoSuchChangeset, \
NoSuchNode
from trac.web.api import RequestDone
__all__ = ['content_closing', 'get_changes', 'get_path_links',
'get_existing_node', 'get_allowed_node', 'make_log_graph',
'render_zip']
class content_closing(object):
def __init__(self, content):
self.content = content
def __enter__(self):
return self.content
def __exit__(self, *exc_info):
if hasattr(self.content, 'close'):
self.content.close()
def get_changes(repos, revs, log=None):
changes = {}
for rev in revs:
if rev in changes:
continue
try:
changeset = repos.get_changeset(rev)
except NoSuchChangeset:
changeset = EmptyChangeset(repos, rev)
if log is not None:
log.warning("Unable to get changeset [%s] in %s", rev,
repos.reponame or '(default)')
changes[rev] = changeset
return changes
def get_path_links(href, reponame, path, rev, order=None, desc=None):
desc = desc or None
links = [{'name': 'source:',
'href': href.browser(rev=rev if reponame == '' else None,
order=order, desc=desc)}]
if reponame:
links.append({
'name': reponame,
'href': href.browser(reponame, rev=rev, order=order, desc=desc)})
partial_path = ''
for part in [p for p in path.split('/') if p]:
partial_path += part + '/'
links.append({
'name': part,
'href': href.browser(reponame or None, partial_path, rev=rev,
order=order, desc=desc)
})
return links
def get_existing_node(req, repos, path, rev):
try:
return repos.get_node(path, rev)
except NoSuchNode as e:
# TRANSLATOR: You can 'search' in the repository history... (link)
search_a = tag.a(_("search"),
href=req.href.log(repos.reponame or None, path,
rev=rev, mode='path_history'))
raise ResourceNotFound(tag(
tag.p(e, class_="message"),
tag.p(tag_("You can %(search)s in the repository history to see "
"if that path existed but was later removed",
search=search_a))))
def get_allowed_node(repos, path, rev, perm):
if repos is not None:
try:
node = repos.get_node(path, rev)
except (NoSuchNode, NoSuchChangeset):
return None
if node.is_viewable(perm):
return node
def make_log_graph(repos, revs):
"""Generate graph information for the given revisions.
Returns a tuple `(threads, vertices, columns)`, where:
* `threads`: List of paint command lists `[(type, column, line)]`, where
`type` is either 0 for "move to" or 1 for "line to", and `column` and
`line` are coordinates.
* `vertices`: List of `(column, thread_index)` tuples, where the `i`th
item specifies the column in which to draw the dot in line `i` and the
corresponding thread.
* `columns`: Maximum width of the graph.
"""
threads = []
vertices = []
columns = 0
revs = iter(revs)
def add_edge(thread, column, line):
if thread and thread[-1][:2] == [1, column] \
and thread[-2][1] == column:
thread[-1][2] = line
else:
thread.append([1, column, line])
try:
next_rev = next(revs)
line = 0
active = []
active_thread = []
while True:
rev = next_rev
if rev not in active:
# Insert new head
threads.append([[0, len(active), line]])
active_thread.append(threads[-1])
active.append(rev)
columns = max(columns, len(active))
column = active.index(rev)
vertices.append((column, threads.index(active_thread[column])))
next_rev = next(revs) # Raises StopIteration when no more revs
next_revs = active[:]
parents = list(repos.parent_revs(rev))
# Replace current item with parents not already present
new_parents = [p for p in parents if p not in active]
next_revs[column:column + 1] = new_parents
# Add edges to parents
for col, (r, thread) in enumerate(zip(active, active_thread)):
if r in next_revs:
add_edge(thread, next_revs.index(r), line + 1)
elif r == rev:
if new_parents:
parents.remove(new_parents[0])
parents.append(new_parents[0])
for parent in parents:
if parent != parents[0]:
thread.append([0, col, line])
add_edge(thread, next_revs.index(parent), line + 1)
if not new_parents:
del active_thread[column]
else:
base = len(threads)
threads.extend([[0, column + 1 + i, line + 1]]
for i in range(len(new_parents) - 1))
active_thread[column + 1:column + 1] = threads[base:]
active = next_revs
line += 1
except StopIteration:
pass
return threads, vertices, columns
def render_zip(req, filename, repos, root_node, iter_nodes):
"""Send a ZIP file containing the data corresponding to the `nodes`
iterable.
:type root_node: `~trac.versioncontrol.api.Node`
:param root_node: optional ancestor for all the *nodes*
:param iter_nodes: callable taking the optional *root_node* as input
and generating the `~trac.versioncontrol.api.Node`
for which the content should be added into the zip.
"""
req.send_response(200)
req.send_header('Content-Type', 'application/zip')
req.send_header('Content-Disposition',
content_disposition('inline', filename))
if root_node:
req.send_header('Last-Modified', http_date(root_node.last_modified))
root_path = root_node.path.rstrip('/')
else:
root_path = ''
if root_path:
root_path += '/'
root_name = root_node.name + '/'
else:
root_name = ''
root_len = len(root_path)
req.end_headers()
def write_partial(fileobj, start):
end = fileobj.tell()
fileobj.seek(start, 0)
remaining = end - start
while remaining > 0:
chunk = fileobj.read(min(remaining, 4096))
req.write(chunk)
remaining -= len(chunk)
fileobj.seek(end, 0)
return end
pos = 0
with TemporaryFile(prefix='trac-', suffix='.zip') as fileobj:
with ZipFile(fileobj, 'w', ZIP_DEFLATED) as zipfile:
for node in iter_nodes(root_node):
if node is root_node:
continue
path = node.path.strip('/')
assert path.startswith(root_path)
path = root_name + path[root_len:]
kwargs = {'mtime': node.last_modified}
data = None
if node.isfile:
with content_closing(
node.get_processed_content(eol_hint='CRLF')) \
as content:
data = content.read()
props = node.get_properties()
# Subversion specific
if 'svn:special' in props and data.startswith('link '):
data = data[5:]
kwargs['symlink'] = True
if 'svn:executable' in props:
kwargs['executable'] = True
elif node.isdir and path:
kwargs['dir'] = True
data = ''
if data is not None:
zipfile.writestr(create_zipinfo(path, **kwargs), data)
pos = write_partial(fileobj, pos)
write_partial(fileobj, pos)
raise RequestDone