trac/admin/web_ui.py
# -*- coding: utf-8 -*-
#
# Copyright (C) 2005-2023 Edgewall Software
# Copyright (C) 2005 Jonas Borgström <jonas@edgewall.com>
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at https://trac.edgewall.org/wiki/TracLicense.
#
# This software consists of voluntary contributions made by many
# individuals. For the exact contribution history, see the revision
# history and logs, available at https://trac.edgewall.org/.
#
# Author: Jonas Borgström <jonas@edgewall.com>
import itertools
import os
import pkg_resources
import re
import shutil
from functools import partial
from trac import log
from trac.admin.api import IAdminPanelProvider
from trac.core import *
from trac.loader import get_plugin_info
from trac.log import LOG_LEVELS, LOG_LEVEL_ALIASES, LOG_LEVEL_ALIASES_MAP
from trac.perm import IPermissionRequestor, PermissionExistsError, \
PermissionSystem
from trac.util.datefmt import all_timezones, pytz
from trac.util.html import tag
from trac.util.text import exception_to_unicode, unicode_from_base64, \
unicode_to_base64
from trac.util.translation import _, Locale, get_available_locales, \
ngettext, tag_
from trac.web.api import HTTPNotFound, IRequestHandler, \
is_valid_default_handler
from trac.web.chrome import Chrome, INavigationContributor, \
ITemplateProvider, add_notice, add_stylesheet, \
add_warning
from trac.wiki.formatter import format_to_html
_valid_log_levels = set()
_valid_log_levels.update(log.LOG_LEVELS)
_valid_log_levels.update(log.LOG_LEVEL_ALIASES)
class AdminModule(Component):
"""Web administration interface provider and panel manager."""
implements(INavigationContributor, IRequestHandler, ITemplateProvider)
panel_providers = ExtensionPoint(IAdminPanelProvider)
# INavigationContributor methods
def get_active_navigation_item(self, req):
return 'admin'
def get_navigation_items(self, req):
# The 'Admin' navigation item is only visible if at least one
# admin panel is available
panels, providers = self._get_panels(req)
if panels:
yield 'mainnav', 'admin', tag.a(_("Admin"), href=req.href.admin())
# IRequestHandler methods
def match_request(self, req):
match = re.match('/admin(?:/([^/]+)(?:/([^/]+)(?:/(.+))?)?)?$',
req.path_info)
if match:
req.args['cat_id'] = match.group(1)
req.args['panel_id'] = match.group(2)
req.args['path_info'] = match.group(3)
return True
def process_request(self, req):
panels, providers = self._get_panels(req)
if not panels:
raise HTTPNotFound(_("No administration panels available"))
def _panel_order(panel):
items = panel[::2]
return items[0] != 'general', items != ('general', 'basics'), items
panels.sort(key=_panel_order)
cat_id = req.args.get('cat_id') or panels[0][0]
panel_id = req.args.get('panel_id')
path_info = req.args.get('path_info')
if not panel_id:
try:
panel_id = list(filter(lambda panel: panel[0] == cat_id,
panels))[0][2]
except IndexError:
raise HTTPNotFound(_("Unknown administration panel"))
provider = providers.get((cat_id, panel_id))
if not provider:
raise HTTPNotFound(_("Unknown administration panel"))
resp = provider.render_admin_panel(req, cat_id, panel_id, path_info)
template, data = resp[:2]
data.update({
'active_cat': cat_id, 'active_panel': panel_id,
'panel_href': partial(req.href, 'admin', cat_id, panel_id),
'panels': itertools.groupby([{
'category': {'id': panel[0], 'label': panel[1]},
'panel': {'id': panel[2], 'label': panel[3]}
} for panel in panels], lambda k: k['category']),
})
add_stylesheet(req, 'common/css/admin.css')
return resp
# ITemplateProvider methods
def get_htdocs_dirs(self):
return []
def get_templates_dirs(self):
return [pkg_resources.resource_filename('trac.admin', 'templates')]
# Internal methods
def _get_panels(self, req):
"""Return a list of available admin panels."""
panels = []
providers = {}
for provider in self.panel_providers:
p = list(provider.get_admin_panels(req) or [])
for panel in p:
providers[(panel[0], panel[2])] = provider
panels += p
return panels, providers
def _save_config(config, req, log, notices=None):
"""Try to save the config, and display either a success notice or a
failure warning.
"""
try:
config.save()
if notices is None:
notices = [_("Your changes have been saved.")]
for notice in notices:
add_notice(req, notice)
except Exception as e:
log.error("Error writing to trac.ini: %s", exception_to_unicode(e))
add_warning(req, _("Error writing to trac.ini, make sure it is "
"writable by the web server. Your changes have "
"not been saved."))
class BasicsAdminPanel(Component):
implements(IAdminPanelProvider)
request_handlers = ExtensionPoint(IRequestHandler)
# IAdminPanelProvider methods
def get_admin_panels(self, req):
if 'TRAC_ADMIN' in req.perm('admin', 'general/basics'):
yield ('general', _("General"), 'basics', _("Basic Settings"))
def render_admin_panel(self, req, cat, page, path_info):
valid_default_handlers = [handler.__class__.__name__
for handler in self.request_handlers
if is_valid_default_handler(handler)]
if Locale:
locale_ids = get_available_locales()
locales = [Locale.parse(locale) for locale in locale_ids]
# don't use str(locale) to prevent storing expanded locale
# identifier, see #11258
languages = sorted((id, locale.display_name)
for id, locale in zip(locale_ids, locales))
else:
locale_ids, locales, languages = [], [], []
if req.method == 'POST':
for option in ('name', 'url', 'descr'):
self.config.set('project', option, req.args.get(option))
default_handler = req.args.get('default_handler')
self.config.set('trac', 'default_handler', default_handler)
default_timezone = req.args.get('default_timezone')
if default_timezone not in all_timezones:
default_timezone = ''
self.config.set('trac', 'default_timezone', default_timezone)
default_language = req.args.get('default_language')
if default_language not in locale_ids:
default_language = ''
self.config.set('trac', 'default_language', default_language)
default_date_format = req.args.get('default_date_format')
if default_date_format != 'iso8601':
default_date_format = ''
self.config.set('trac', 'default_date_format',
default_date_format)
default_dateinfo_format = req.args.get('default_dateinfo_format')
if default_dateinfo_format not in ('relative', 'absolute'):
default_dateinfo_format = 'relative'
self.config.set('trac', 'default_dateinfo_format',
default_dateinfo_format)
_save_config(self.config, req, self.log)
req.redirect(req.href.admin(cat, page))
default_handler = self.config.get('trac', 'default_handler')
default_timezone = self.config.get('trac', 'default_timezone')
default_language = self.config.get('trac', 'default_language')
default_date_format = self.config.get('trac', 'default_date_format')
default_dateinfo_format = self.config.get('trac',
'default_dateinfo_format')
data = {
'default_handler': default_handler,
'valid_default_handlers': sorted(valid_default_handlers),
'default_timezone': default_timezone,
'timezones': all_timezones,
'has_pytz': pytz is not None,
'default_language': default_language.replace('-', '_'),
'languages': languages,
'default_date_format': default_date_format,
'default_dateinfo_format': default_dateinfo_format,
'has_babel': Locale is not None,
}
Chrome(self.env).add_textarea_grips(req)
return 'admin_basics.html', data
class LoggingAdminPanel(Component):
implements(IAdminPanelProvider)
# IAdminPanelProvider methods
def get_admin_panels(self, req):
if 'TRAC_ADMIN' in req.perm('admin', 'general/logging'):
yield ('general', _("General"), 'logging', _("Logging"))
def render_admin_panel(self, req, cat, page, path_info):
log_type = self.env.log_type
log_level = self.env.log_level
log_file = self.env.log_file
log_dir = self.env.log_dir
log_types = [
dict(name='none', label=_("None"),
selected=log_type == 'none', disabled=False),
dict(name='stderr', label=_("Console"),
selected=log_type == 'stderr', disabled=False),
dict(name='file', label=_("File"),
selected=log_type == 'file', disabled=False),
dict(name='syslog', label=_("Syslog"),
selected=log_type in ('unix', 'syslog'),
disabled=os.name != 'posix'),
dict(name='eventlog', label=_("Windows event log"),
selected=log_type in ('winlog', 'eventlog', 'nteventlog'),
disabled=os.name != 'nt'),
]
if req.method == 'POST':
changed = False
new_type = req.args.get('log_type')
if new_type not in [t['name'] for t in log_types]:
raise TracError(
_("Unknown log type %(type)s", type=new_type),
_("Invalid log type")
)
new_file = req.args.get('log_file', log_file)
if not new_file:
raise TracError(_("You must specify a log file"),
_("Missing field"))
new_level = req.args.get('log_level', log_level)
if new_level not in _valid_log_levels:
raise TracError(
_("Unknown log level %(level)s", level=new_level),
_("Invalid log level"))
# Create logger to be sure the configuration is valid.
new_file_path = new_file
if not os.path.isabs(new_file_path):
new_file_path = os.path.join(self.env.log_dir, new_file)
try:
logger, handler = \
self.env.create_logger(new_type, new_file_path, new_level,
self.env.log_format)
except Exception as e:
add_warning(req,
tag_("Changes not saved. Logger configuration "
"error: %(error)s. Inspect the log for more "
"information.",
error=tag.code(exception_to_unicode(e))))
self.log.error("Logger configuration error: %s",
exception_to_unicode(e, traceback=True))
else:
handler.close()
if new_type != log_type:
self.config.set('logging', 'log_type', new_type)
changed = True
log_type = new_type
if new_level != log_level:
self.config.set('logging', 'log_level', new_level)
changed = True
log_level = new_level
if new_file != log_file:
self.config.set('logging', 'log_file', new_file)
changed = True
log_file = new_file
if changed:
_save_config(self.config, req, self.log),
req.redirect(req.href.admin(cat, page))
# Order log levels by priority value, with aliases excluded.
all_levels = sorted(log.LOG_LEVEL_MAP, key=log.LOG_LEVEL_MAP.get,
reverse=True)
log_levels = [level for level in all_levels if level in log.LOG_LEVELS]
log_level = LOG_LEVEL_ALIASES_MAP.get(log_level, log_level)
data = {
'type': log_type, 'types': log_types,
'level': log_level, 'levels': log_levels,
'file': log_file, 'dir': log_dir
}
return 'admin_logging.html', {'log': data}
class PermissionAdminPanel(Component):
implements(IAdminPanelProvider, IPermissionRequestor)
# IPermissionRequestor methods
def get_permission_actions(self):
actions = ['PERMISSION_GRANT', 'PERMISSION_REVOKE']
return actions + [('PERMISSION_ADMIN', actions)]
# IAdminPanelProvider methods
def get_admin_panels(self, req):
perm = req.perm('admin', 'general/perm')
if 'PERMISSION_GRANT' in perm or 'PERMISSION_REVOKE' in perm:
yield ('general', _("General"), 'perm', _("Permissions"))
def render_admin_panel(self, req, cat, page, path_info):
perm = PermissionSystem(self.env)
all_actions = perm.get_actions()
if req.method == 'POST':
subject = req.args.get('subject', '').strip()
target = req.args.get('target', '').strip()
action = req.args.get('action')
group = req.args.get('group', '').strip()
if subject and subject.isupper() or \
group and group.isupper() or \
target and target.isupper():
raise TracError(_("All upper-cased tokens are reserved for "
"permission names."))
# Grant permission to subject
if 'add' in req.args and subject and action:
req.perm('admin', 'general/perm').require('PERMISSION_GRANT')
if action not in all_actions:
raise TracError(_("Unknown action"))
req.perm.require(action)
try:
perm.grant_permission(subject, action)
except TracError as e:
add_warning(req, e)
else:
add_notice(req, _("The subject %(subject)s has been "
"granted the permission %(action)s.",
subject=subject, action=action))
# Add subject to group
elif 'add' in req.args and subject and group:
req.perm('admin', 'general/perm').require('PERMISSION_GRANT')
for action in sorted(
perm.get_user_permissions(group, expand_meta=False)):
req.perm.require(action,
message=tag_(
"The subject %(subject)s was not added to the "
"group %(group)s. The group has %(perm)s "
"permission and you cannot grant permissions you "
"don't possess.", subject=tag.strong(subject),
group=tag.strong(group), perm=tag.strong(action)))
try:
perm.grant_permission(subject, group)
except TracError as e:
add_warning(req, e)
else:
add_notice(req, _("The subject %(subject)s has been "
"added to the group %(group)s.",
subject=subject, group=group))
# Copy permissions to subject
elif 'copy' in req.args and subject and target:
req.perm('admin', 'general/perm').require('PERMISSION_GRANT')
subject_permissions = perm.get_users_dict().get(subject, [])
if not subject_permissions:
add_warning(req, _("The subject %(subject)s does not "
"have any permissions.",
subject=subject))
for action in subject_permissions:
if action not in all_actions: # plugin disabled?
self.log.warning("Skipped granting %s to %s: "
"permission unavailable.",
action, target)
else:
if action not in req.perm:
add_warning(req,
_("The permission %(action)s was "
"not granted to %(subject)s "
"because users cannot grant "
"permissions they don't possess.",
action=action, subject=subject))
continue
try:
perm.grant_permission(target, action)
except PermissionExistsError:
pass
else:
add_notice(req, _("The subject %(subject)s has "
"been granted the permission "
"%(action)s.",
subject=target, action=action))
req.redirect(req.href.admin(cat, page))
# Remove permissions action
elif 'remove' in req.args and 'sel' in req.args:
req.perm('admin', 'general/perm').require('PERMISSION_REVOKE')
for key in req.args.getlist('sel'):
subject, action = key.split(':', 1)
subject = unicode_from_base64(subject)
action = unicode_from_base64(action)
if (subject, action) in perm.get_all_permissions():
perm.revoke_permission(subject, action)
add_notice(req, _("The selected permissions have been "
"revoked."))
req.redirect(req.href.admin(cat, page))
return 'admin_perms.html', {
'actions': all_actions,
'allowed_actions': [a for a in all_actions if a in req.perm],
'perms': perm.get_users_dict(),
'groups': perm.get_groups_dict(),
'unicode_to_base64': unicode_to_base64
}
class PluginAdminPanel(Component):
implements(IAdminPanelProvider)
# IAdminPanelProvider methods
def get_admin_panels(self, req):
if 'TRAC_ADMIN' in req.perm('admin', 'general/plugin'):
yield ('general', _("General"), 'plugin', _("Plugins"))
def render_admin_panel(self, req, cat, page, path_info):
if req.method == 'POST':
if 'install' in req.args:
self._do_install(req)
elif 'uninstall' in req.args:
self._do_uninstall(req)
else:
self._do_update(req)
anchor = ''
if 'plugin' in req.args:
anchor = '#no%d' % (req.args.getint('plugin') + 1)
req.redirect(req.href.admin(cat, page) + anchor)
return self._render_view(req)
# Internal methods
def _do_install(self, req):
"""Install a plugin."""
if 'plugin_file' not in req.args:
raise TracError(_("No file uploaded"))
upload = req.args['plugin_file']
if isinstance(upload, str) or not upload.filename:
raise TracError(_("No file uploaded"))
plugin_filename = upload.filename.replace('\\', '/').replace(':', '/')
plugin_filename = os.path.basename(plugin_filename)
if not plugin_filename:
raise TracError(_("No file uploaded"))
if not plugin_filename.endswith('.egg') and \
not plugin_filename.endswith('.py'):
raise TracError(_("Uploaded file is not a Python source file or "
"egg"))
target_path = os.path.join(self.env.plugins_dir, plugin_filename)
if os.path.isfile(target_path):
raise TracError(_("Plugin %(name)s already installed",
name=plugin_filename))
self.log.info("Installing plugin %s", plugin_filename)
flags = os.O_CREAT + os.O_WRONLY + os.O_EXCL
try:
flags += os.O_BINARY
except AttributeError:
# OS_BINARY not available on every platform
pass
with os.fdopen(os.open(target_path, flags, 0o666),
'wb') as target_file:
shutil.copyfileobj(upload.file, target_file)
self.log.info("Plugin %s installed to %s", plugin_filename,
target_path)
# TODO: Validate that the uploaded file is a valid Trac plugin
# Make the environment reset itself on the next request
self.env.config.touch()
def _do_uninstall(self, req):
"""Uninstall a plugin."""
plugin_filename = req.args.get('plugin_filename')
if not plugin_filename:
return
plugin_path = os.path.join(self.env.plugins_dir, plugin_filename)
if not os.path.isfile(plugin_path):
return
self.log.info("Uninstalling plugin %s", plugin_filename)
os.remove(plugin_path)
# Make the environment reset itself on the next request
self.env.config.touch()
def _do_update(self, req):
"""Update component enable state."""
components = req.args.getlist('component')
enabled = req.args.getlist('enable')
added, removed = [], []
# FIXME: this needs to be more intelligent and minimize multiple
# component names to prefix rules
for component in components:
is_enabled = bool(self.env.is_component_enabled(component))
must_enable = component in enabled
if is_enabled != must_enable:
self.config.set('components', component,
'disabled' if is_enabled else 'enabled')
self.log.info("%sabling component %s",
"Dis" if is_enabled else "En", component)
if must_enable:
added.append(component)
else:
removed.append(component)
if added or removed:
def make_list(items):
parts = [item.rsplit('.', 1) for item in items]
return tag.table(tag.tbody(
tag.tr(tag.td(c, class_='trac-name'),
tag.td('(%s.*)' % m, class_='trac-name'))
for m, c in parts), class_='trac-pluglist')
added.sort()
removed.sort()
notices = []
if removed:
msg = ngettext("The following component has been disabled:",
"The following components have been disabled:",
len(removed))
notices.append(tag(msg, make_list(removed)))
if added:
msg = ngettext("The following component has been enabled:",
"The following components have been enabled:",
len(added))
notices.append(tag(msg, make_list(added)))
# set the default value of options for only the enabled components
for component in added:
self.config.set_defaults(component=component)
_save_config(self.config, req, self.log, notices)
def _render_view(self, req):
plugins = get_plugin_info(self.env, include_core=True)
def safe_wiki_to_html(context, text):
try:
return format_to_html(self.env, context, text)
except Exception as e:
self.log.error("Unable to render component documentation: %s",
exception_to_unicode(e, traceback=True))
return tag.pre(text)
data = {
'plugins': plugins, 'show': req.args.get('show'),
'readonly': not os.access(self.env.plugins_dir,
os.F_OK + os.W_OK),
'safe_wiki_to_html': safe_wiki_to_html,
}
return 'admin_plugins.html', data