PyDrocsid/cogs

View on GitHub
general/voice_channel/cog.py

Summary

Maintainability
A
2 hrs
Test Coverage
from __future__ import annotations

import asyncio
import random
from os import getenv
from pathlib import Path
from typing import Optional, Union

from discord import (
    CategoryChannel,
    Embed,
    Forbidden,
    Guild,
    HTTPException,
    Interaction,
    InteractionResponse,
    Member,
    Message,
    NotFound,
    PermissionOverwrite,
    Role,
    TextChannel,
    VoiceChannel,
    VoiceState,
    ui,
)
from discord.abc import Messageable
from discord.ext import commands, tasks
from discord.ext.commands import CommandError, Context, Greedy, UserInputError, guild_only
from discord.ui import Button, View
from discord.utils import format_dt, utcnow

from PyDrocsid.async_thread import GatherAnyError, gather_any
from PyDrocsid.cog import Cog
from PyDrocsid.command import Confirmation, docs, optional_permissions, reply
from PyDrocsid.database import db, db_context, db_wrapper, delete, filter_by, select
from PyDrocsid.embeds import send_long_embed
from PyDrocsid.emojis import name_to_emoji
from PyDrocsid.multilock import MultiLock
from PyDrocsid.prefix import get_prefix
from PyDrocsid.redis import redis
from PyDrocsid.settings import RoleSettings
from PyDrocsid.translations import t
from PyDrocsid.util import check_role_assignable, send_editable_log

from .colors import Colors
from .models import DynChannel, DynChannelMember, DynGroup, RoleVoiceLink
from .permissions import VoiceChannelPermission
from ...contributor import Contributor
from ...pubsub import send_alert, send_to_changelog


tg = t.g
t = t.voice_channel

Overwrites = dict[Union[Member, Role], PermissionOverwrite]


def merge_permission_overwrites(
    overwrites: Overwrites, *args: tuple[Union[Member, Role], PermissionOverwrite]
) -> Overwrites:
    out = {k: PermissionOverwrite.from_pair(*v.pair()) for k, v in overwrites.items()}
    for k, v in args:
        out.setdefault(k, PermissionOverwrite()).update(**{p: q for p, q in v if q is not None})
    return out


def check_voice_permissions(voice_channel: VoiceChannel, role: Role) -> bool:
    view_channel = voice_channel.overwrites_for(role).view_channel
    connect = voice_channel.overwrites_for(role).connect
    if view_channel is None:
        view_channel = role.permissions.view_channel
    if connect is None:
        connect = role.permissions.connect
    return view_channel and connect


async def collect_links(guild: Guild, link_set, channel_id):
    link: RoleVoiceLink
    async for link in await db.stream(filter_by(RoleVoiceLink, voice_channel=channel_id)):
        if role := guild.get_role(link.role):
            link_set.add(role)


async def update_roles(member: Member, *, add: set[Role] = None, remove: set[Role] = None):
    add = add or set()
    remove = remove or set()
    add, remove = add - remove, remove - add

    for role in remove:
        try:
            await member.remove_roles(role)
        except Forbidden:
            await send_alert(member.guild, t.could_not_remove_roles(role.mention, member.mention))

    for role in add:
        try:
            await member.add_roles(role)
        except Forbidden:
            await send_alert(member.guild, t.could_not_add_roles(role.mention, member.mention))


async def get_commands_embed() -> Embed:
    return Embed(
        title=t.dyn_voice_help_title,
        color=Colors.Voice,
        description=t.dyn_voice_help_content(prefix=await get_prefix()),
    )


async def rename_channel(channel: Union[TextChannel, VoiceChannel], name: str):
    try:
        idx, _ = await gather_any(channel.edit(name=name), asyncio.sleep(3))
    except GatherAnyError as e:
        raise e.exception

    if idx:
        raise CommandError(t.rename_rate_limit)


def get_user_role(guild: Guild, channel: DynChannel) -> Optional[Role]:
    return guild.get_role(channel.group.user_role)


def remove_lock_overrides(
    channel: DynChannel,
    voice_channel: VoiceChannel,
    overwrites: Overwrites,
    *,
    keep_members: bool,
    reset_user_role: bool,
) -> Overwrites:
    me = voice_channel.guild.me
    overwrites = {
        k: v
        for k, v in overwrites.items()
        if not isinstance(k, Member) or k == me or (keep_members and k in voice_channel.members)
    }
    if not reset_user_role:
        return overwrites

    user_role = voice_channel.guild.get_role(channel.group.user_role)
    overwrites = merge_permission_overwrites(overwrites, (user_role, PermissionOverwrite(view_channel=True)))
    overwrites[user_role].update(connect=None)
    return overwrites


async def safe_create_voice_channel(
    category: Union[CategoryChannel, Guild], channel: DynChannel, name: str, overwrites: Overwrites
) -> VoiceChannel:
    guild: Guild = category.guild if isinstance(category, CategoryChannel) else category
    user_role: Role = get_user_role(guild, channel)

    try:
        return await category.create_voice_channel(name, overwrites=overwrites)
    except Forbidden:
        pass

    ov = overwrites.pop(user_role, None)
    voice_channel: VoiceChannel = await category.create_voice_channel(name, overwrites=overwrites)

    if ov:
        overwrites[user_role] = ov
        await voice_channel.edit(overwrites=overwrites)

    return voice_channel


class ControlMessage(View):
    def __init__(self, cog: VoiceChannelCog, channel: DynChannel, message: Message):
        super().__init__(timeout=None)
        self.cog = cog
        self.channel = channel
        self.message = message

        _, locked, hidden = self.get_status()

        self.children: list[Button]
        self.children[2].label = t.buttons["unlock" if locked else "lock"]
        self.children[2].emoji = name_to_emoji["unlock" if locked else "lock"]
        self.children[3].label = t.buttons["show" if hidden else "hide"]
        self.children[3].emoji = name_to_emoji["eye" if hidden else "man_detective"]

    async def update(self):
        self.channel = await DynChannel.get(channel_id=self.channel.channel_id)

    def get_status(self):
        voice_channel: VoiceChannel = self.cog.bot.get_channel(self.channel.channel_id)
        user_role = voice_channel.guild.get_role(self.channel.group.user_role)
        locked = self.channel.locked
        hidden = voice_channel.overwrites_for(user_role).view_channel is False
        return voice_channel, locked, hidden

    @ui.button(label=t.buttons.info, emoji=name_to_emoji["information_source"])
    @db_wrapper
    async def info(self, _, interaction: Interaction):
        await self.cog.send_voice_info(interaction.response, self.channel)

    @ui.button(label=t.buttons.help, emoji=name_to_emoji["grey_question"])
    async def help(self, _, interaction: Interaction):
        await interaction.response.send_message(embed=await get_commands_embed(), ephemeral=True)

    @ui.button()
    @db_wrapper
    async def lock(self, _, interaction: Interaction):
        await self.update()
        try:
            await self.cog.check_authorization(self.channel, interaction.user)
        except CommandError:
            await interaction.response.send_message(t.private_voice_owner_required, ephemeral=True)
            return

        voice_channel, locked, _ = self.get_status()
        if not locked:
            await self.cog.lock_channel(interaction.user, self.channel, voice_channel, hide=False)
        else:
            await self.cog.unlock_channel(interaction.user, self.channel, voice_channel)
        await interaction.response.defer()

    @ui.button()
    @db_wrapper
    async def hide(self, _, interaction: Interaction):
        await self.update()
        try:
            await self.cog.check_authorization(self.channel, interaction.user)
        except CommandError:
            await interaction.response.send_message(t.private_voice_owner_required, ephemeral=True)
            return

        voice_channel, _, hidden = self.get_status()
        if not hidden:
            await self.cog.lock_channel(interaction.user, self.channel, voice_channel, hide=True)
        else:
            await self.cog.unhide_channel(interaction.user, self.channel, voice_channel)
        await interaction.response.defer()


class VoiceChannelCog(Cog, name="Voice Channels"):
    CONTRIBUTORS = [
        Contributor.Defelo,
        Contributor.Florian,
        Contributor.wolflu,
        Contributor.TNT2k,
        # vc name lists only:
        Contributor.Scriptim,
        Contributor.MarcelCoding,
        Contributor.Felux,
        Contributor.hackandcode,
    ]

    def __init__(self, team_roles: list[str]):
        self.team_roles: list[str] = team_roles
        self._owners: dict[int, Member] = {}

        self._join_tasks: dict[tuple[Member, VoiceChannel], asyncio.Task] = {}
        self._leave_tasks: dict[tuple[Member, VoiceChannel], asyncio.Task] = {}
        self._channel_lock = MultiLock()
        self._recent_kicks: set[tuple[Member, VoiceChannel]] = set()

        names = getenv("VOICE_CHANNEL_NAMES", "*")
        if names == "*":
            name_lists = [file.name.removesuffix(".txt") for file in Path(__file__).parent.joinpath("names").iterdir()]
        else:
            name_lists = names.split(",")

        self.names: dict[str, set[str]] = {}
        for name_list in name_lists:
            self.names[name_list] = set()
            with Path(__file__).parent.joinpath(f"names/{name_list}.txt").open() as file:
                for name in file.readlines():
                    if name := name.strip():
                        self.names[name_list].add(name)

        self.allowed_names: set[str] = set()
        for path in Path(__file__).parent.joinpath("names").iterdir():
            if not path.name.endswith(".txt"):
                continue

            with path.open() as file:
                for name in file.readlines():
                    if name := name.strip():
                        self.allowed_names.add(name.lower())

    def prepare(self) -> bool:
        return bool(self.names)

    def _get_name_list(self, guild_id: int) -> str:
        r = random.Random(f"{guild_id}{utcnow().date().isoformat()}")
        return r.choice(sorted(self.names))

    def _random_channel_name(self, guild_id: int, avoid: set[str]) -> Optional[str]:
        names = self.names[self._get_name_list(guild_id)]
        allowed = list({*names} - avoid)
        if allowed and random.randrange(100):
            return random.choice(allowed)

        a = "acddfilmmrtneeelnoioanopflofckrztrhetri  pu2aolain  hpkkxo "
        a += "ai  ea     nt  ul      y  st          u          f          f           "
        c = len(b := [*range(13 - 37 + 42 + ((4 > 2) << 4 - 2) >> (1 & 3 & 3 & 7 & ~42))])
        return random.shuffle(b) or next((e for d in b if (e := a[d::c].strip()) not in avoid), None)

    async def get_channel_name(self, guild: Guild) -> str:
        return self._random_channel_name(guild.id, {channel.name for channel in guild.voice_channels})

    async def is_teamler(self, member: Member) -> bool:
        return any(
            team_role in member.roles
            for role_name in self.team_roles
            if (team_role := member.guild.get_role(await RoleSettings.get(role_name))) is not None
        )

    def get_text_channel(self, channel: DynChannel) -> TextChannel:
        if text_channel := self.bot.get_channel(channel.text_id):
            return text_channel

        raise CommandError(t.no_text_channel(f"<#{channel.channel_id}>"))

    async def get_owner(self, channel: DynChannel) -> Optional[Member]:
        if out := self._owners.get(channel.channel_id):
            return out

        self._owners[channel.channel_id] = await self.fetch_owner(channel)
        return self._owners[channel.channel_id]

    async def update_owner(self, channel: DynChannel, new_owner: Optional[Member]) -> Optional[Member]:
        old_owner: Optional[Member] = self._owners.get(channel.channel_id)

        if not new_owner:
            self._owners.pop(channel.channel_id, None)
        elif old_owner != new_owner:
            self._owners[channel.channel_id] = new_owner
            await self.send_voice_msg(channel, t.voice_channel, t.voice_owner_changed(new_owner.mention))

        return new_owner

    async def send_voice_msg(self, channel: DynChannel, title: str, msg: str, force_new_embed: bool = False):
        try:
            text_channel: TextChannel = self.get_text_channel(channel)
        except CommandError as e:
            await send_alert(self.bot.guilds[0], *e.args)
            return

        color = int([Colors.unlocked, Colors.locked][channel.locked])
        try:
            message: Message = await send_editable_log(
                text_channel,
                title,
                "",
                format_dt(now := utcnow(), style="D") + " " + format_dt(now, style="T"),
                msg,
                colour=color,
                force_new_embed=force_new_embed,
                force_new_field=True,
            )
        except Forbidden:
            await send_alert(text_channel.guild, t.could_not_send_voice_msg(text_channel.mention))
            return

        await self.update_control_message(channel, message)

    async def update_control_message(self, channel: DynChannel, message: Message):
        async def clear_view(msg_id):
            try:
                await (await message.channel.fetch_message(msg_id)).edit(view=None)
            except Forbidden:
                await send_alert(message.guild, t.could_not_clear_reactions(message.jump_url, message.channel.mention))
            except NotFound:
                pass

        if (msg := await redis.get(key := f"dynvc_control_message:{channel.text_id}")) and msg != str(message.id):
            asyncio.create_task(clear_view(msg))

        await redis.setex(key, 86400, message.id)

        await message.edit(view=ControlMessage(self, channel, message))

    async def fix_owner(self, channel: DynChannel) -> Optional[Member]:
        voice_channel: VoiceChannel = self.bot.get_channel(channel.channel_id)

        in_voice = {m.id for m in voice_channel.members}
        for m in channel.members:
            if m.member_id in in_voice:
                member = voice_channel.guild.get_member(m.member_id)
                if member.bot:
                    continue

                channel.owner_id = m.id
                return await self.update_owner(channel, member)

        channel.owner_id = None
        return await self.update_owner(channel, None)

    async def fetch_owner(self, channel: DynChannel) -> Optional[Member]:
        voice_channel: VoiceChannel = self.bot.get_channel(channel.channel_id)

        if channel.owner_override and any(channel.owner_override == member.id for member in voice_channel.members):
            return voice_channel.guild.get_member(channel.owner_override)

        owner: Optional[DynChannelMember] = await db.get(DynChannelMember, id=channel.owner_id)
        if owner and any(owner.member_id == member.id for member in voice_channel.members):
            return voice_channel.guild.get_member(owner.member_id)

        return await self.fix_owner(channel)

    async def check_authorization(self, channel: DynChannel, member: Member):
        if await VoiceChannelPermission.override_owner.check_permissions(member):
            return

        if await self.get_owner(channel) == member:
            return

        raise CommandError(t.private_voice_owner_required)

    async def get_channel(
        self, member: Member, *, check_owner: bool, check_locked: bool = False
    ) -> tuple[DynChannel, VoiceChannel]:
        if member.voice is None or member.voice.channel is None:
            raise CommandError(t.not_in_voice)

        voice_channel: VoiceChannel = member.voice.channel
        channel: Optional[DynChannel] = await db.get(
            DynChannel, [DynChannel.group, DynGroup.channels], DynChannel.members, channel_id=voice_channel.id
        )
        if not channel:
            raise CommandError(t.not_in_voice)

        if check_locked and not channel.locked:
            raise CommandError(t.channel_not_locked)

        if check_owner:
            await self.check_authorization(channel, member)

        return channel, voice_channel

    async def on_ready(self):
        guild: Guild = self.bot.guilds[0]

        role_voice_links: dict[Role, list[VoiceChannel]] = {}

        link: RoleVoiceLink
        async for link in await db.stream(select(RoleVoiceLink)):
            role: Optional[Role] = guild.get_role(link.role)
            if role is None:
                await db.delete(link)
                continue

            if link.voice_channel.isnumeric():
                voice: Optional[VoiceChannel] = guild.get_channel(int(link.voice_channel))
                if not voice:
                    await db.delete(link)
                else:
                    role_voice_links.setdefault(role, []).append(voice)
            else:
                group: Optional[DynGroup] = await db.get(DynGroup, DynGroup.channels, id=link.voice_channel)
                if not group:
                    await db.delete(link)
                    continue

                for channel in group.channels:
                    if voice := guild.get_channel(channel.channel_id):
                        role_voice_links.setdefault(role, []).append(voice)

        role_changes: dict[Member, tuple[set[Role], set[Role]]] = {}
        for role, channels in role_voice_links.items():
            members = set()
            for channel in channels:
                members.update(channel.members)
            for member in members:
                if role not in member.roles:
                    role_changes.setdefault(member, (set(), set()))[0].add(role)
            for member in role.members:
                if member not in members:
                    role_changes.setdefault(member, (set(), set()))[1].add(role)

        for member, (add, remove) in role_changes.items():
            asyncio.create_task(update_roles(member, add=add, remove=remove))

        try:
            self.vc_loop.start()
        except RuntimeError:
            self.vc_loop.restart()

    @tasks.loop(minutes=30)
    @db_wrapper
    async def vc_loop(self):
        guild: Guild = self.bot.guilds[0]

        channel: DynChannel
        async for channel in await db.stream(select(DynChannel)):
            voice_channel: Optional[VoiceChannel] = guild.get_channel(channel.channel_id)
            if not voice_channel:
                await db.delete(channel)
                continue

            if not voice_channel.members:
                asyncio.create_task(voice_channel.edit(name=await self.get_channel_name(guild)))

    async def lock_channel(self, member: Member, channel: DynChannel, voice_channel: VoiceChannel, *, hide: bool):
        locked = channel.locked
        channel.locked = True
        member_overwrites = [
            (member, PermissionOverwrite(view_channel=True, connect=True)) for member in voice_channel.members
        ]
        overwrites = merge_permission_overwrites(
            voice_channel.overwrites,
            (
                voice_channel.guild.get_role(channel.group.user_role),
                PermissionOverwrite(view_channel=not hide, connect=False),
            ),
            *member_overwrites,
        )

        try:
            await voice_channel.edit(overwrites=overwrites)
        except Forbidden:
            raise CommandError(t.could_not_overwrite_permissions(voice_channel.mention))

        text_channel = self.get_text_channel(channel)
        try:
            await text_channel.edit(overwrites=merge_permission_overwrites(text_channel.overwrites, *member_overwrites))
        except Forbidden:
            raise CommandError(t.could_not_overwrite_permissions(text_channel.mention))

        if hide:
            await self.send_voice_msg(channel, t.voice_channel, t.hidden(member.mention), force_new_embed=not locked)
        else:
            await self.send_voice_msg(channel, t.voice_channel, t.locked(member.mention), force_new_embed=True)

    async def unlock_channel(
        self, member: Optional[Member], channel: DynChannel, voice_channel: VoiceChannel, *, skip_text: bool = False
    ):
        channel.locked = False
        overwrites = remove_lock_overrides(
            channel, voice_channel, voice_channel.overwrites, keep_members=False, reset_user_role=True
        )

        try:
            await voice_channel.edit(overwrites=overwrites)
        except Forbidden:
            raise CommandError(t.could_not_overwrite_permissions(voice_channel.mention))

        if skip_text:
            return

        text_channel = self.get_text_channel(channel)
        try:
            await text_channel.edit(
                overwrites=remove_lock_overrides(
                    channel, voice_channel, text_channel.overwrites, keep_members=True, reset_user_role=False
                )
            )
        except Forbidden:
            raise CommandError(t.could_not_overwrite_permissions(text_channel.mention))

        await self.send_voice_msg(channel, t.voice_channel, t.unlocked(member.mention), force_new_embed=True)

    async def unhide_channel(self, member: Member, channel: DynChannel, voice_channel: VoiceChannel):
        user_role = voice_channel.guild.get_role(channel.group.user_role)

        try:
            await voice_channel.set_permissions(user_role, view_channel=True, connect=False)
        except Forbidden:
            raise CommandError(t.could_not_overwrite_permissions(voice_channel.mention))

        await self.send_voice_msg(channel, t.voice_channel, t.visible(member.mention))

    async def add_to_channel(self, channel: DynChannel, voice_channel: VoiceChannel, member: Member):
        overwrite = PermissionOverwrite(view_channel=True, connect=True)
        try:
            await voice_channel.set_permissions(member, overwrite=overwrite)
        except Forbidden:
            raise CommandError(t.could_not_overwrite_permissions(voice_channel.mention))

        text_channel = self.get_text_channel(channel)
        try:
            await text_channel.set_permissions(member, overwrite=overwrite)
        except Forbidden:
            raise CommandError(t.could_not_overwrite_permissions(text_channel.mention))

        await self.send_voice_msg(channel, t.voice_channel, t.user_added(member.mention))

    async def remove_from_channel(self, channel: DynChannel, voice_channel: VoiceChannel, member: Member):
        try:
            await voice_channel.set_permissions(member, overwrite=None)
        except Forbidden:
            raise CommandError(t.could_not_overwrite_permissions(voice_channel.mention))

        text_channel = self.get_text_channel(channel)
        try:
            await text_channel.set_permissions(member, overwrite=None)
        except Forbidden:
            raise CommandError(t.could_not_overwrite_permissions(text_channel.mention))

        await db.exec(delete(DynChannelMember).filter_by(channel_id=voice_channel.id, member_id=member.id))
        is_owner = member == await self.get_owner(channel)
        if member.voice and member.voice.channel == voice_channel:
            try:
                await member.move_to(None)
            except Forbidden:
                await send_alert(member.guild, t.could_not_kick(member.mention, voice_channel.mention))
                is_owner = False
            else:
                self._recent_kicks.add((member, voice_channel))

        await self.send_voice_msg(channel, t.voice_channel, t.user_removed(member.mention))
        if is_owner:
            await self.fix_owner(channel)

    async def member_join(self, member: Member, voice_channel: VoiceChannel):
        channel: Optional[DynChannel] = await DynChannel.get(channel_id=voice_channel.id)
        if not channel:
            return

        guild: Guild = voice_channel.guild
        category: Union[CategoryChannel, Guild] = voice_channel.category or guild

        text_channel: Optional[TextChannel] = self.bot.get_channel(channel.text_id)
        if not text_channel:
            overwrites = {
                guild.default_role: PermissionOverwrite(read_messages=False, connect=False),
                guild.me: PermissionOverwrite(read_messages=True, manage_channels=True),
            }
            for role_name in self.team_roles:
                if not (team_role := guild.get_role(await RoleSettings.get(role_name))):
                    continue
                if check_voice_permissions(voice_channel, team_role):
                    overwrites[team_role] = PermissionOverwrite(read_messages=True)
            try:
                text_channel = await category.create_text_channel(
                    voice_channel.name, topic=t.text_channel_for(voice_channel.mention), overwrites=overwrites
                )
            except (Forbidden, HTTPException):
                await send_alert(voice_channel.guild, t.could_not_create_text_channel(voice_channel.mention))
                return

            channel.text_id = text_channel.id
            await text_channel.send(embed=await get_commands_embed())
            await self.send_voice_msg(channel, t.voice_channel, t.dyn_voice_created(member.mention))

        try:
            await text_channel.set_permissions(member, overwrite=PermissionOverwrite(read_messages=True))
        except Forbidden:
            await send_alert(voice_channel.guild, t.could_not_overwrite_permissions(text_channel.mention))

        await self.send_voice_msg(channel, t.voice_channel, t.dyn_voice_joined(member.mention))

        if channel.locked and member not in voice_channel.overwrites:
            try:
                await self.add_to_channel(channel, voice_channel, member)
            except CommandError as e:
                await send_alert(voice_channel.guild, *e.args)

        channel_member: Optional[DynChannelMember] = await db.get(
            DynChannelMember, member_id=member.id, channel_id=voice_channel.id
        )
        if not channel_member:
            channel.members.append(channel_member := await DynChannelMember.create(member.id, voice_channel.id))

        owner: Optional[DynChannelMember] = await db.get(DynChannelMember, id=channel.owner_id)
        update_owner = False
        if (not owner or channel_member.timestamp < owner.timestamp) and channel.owner_id != channel_member.id:
            if not member.bot:
                channel.owner_id = channel_member.id
                update_owner = True
        if update_owner or channel.owner_override == member.id:
            await self.update_owner(channel, await self.fetch_owner(channel))

        if all(c.members for chnl in channel.group.channels if (c := self.bot.get_channel(chnl.channel_id))):
            overwrites = voice_channel.overwrites
            if channel.locked:
                overwrites = remove_lock_overrides(
                    channel, voice_channel, overwrites, keep_members=False, reset_user_role=True
                )
            try:
                new_channel = await safe_create_voice_channel(
                    category, channel, await self.get_channel_name(guild), overwrites
                )
            except (Forbidden, HTTPException):
                await send_alert(voice_channel.guild, t.could_not_create_voice_channel)
            else:
                await DynChannel.create(new_channel.id, channel.group_id)

    async def member_leave(self, member: Member, voice_channel: VoiceChannel):
        channel: Optional[DynChannel] = await DynChannel.get(channel_id=voice_channel.id)
        if not channel:
            return

        text_channel: Optional[TextChannel] = self.bot.get_channel(channel.text_id)
        if not text_channel:
            await send_alert(voice_channel.guild, t.no_text_channel(f"<#{channel.channel_id}>"))

        if text_channel and not channel.locked:
            try:
                await text_channel.set_permissions(member, overwrite=None)
            except Forbidden:
                await send_alert(voice_channel.guild, t.could_not_overwrite_permissions(text_channel.mention))

        if text_channel:
            await self.send_voice_msg(channel, t.voice_channel, t.dyn_voice_left(member.mention))

        owner: Optional[DynChannelMember] = await db.get(DynChannelMember, id=channel.owner_id)
        if owner and owner.member_id == member.id or channel.owner_override == member.id:
            await self.fix_owner(channel)

        if any(not m.bot for m in voice_channel.members):
            return

        async def delete_text():
            if text_channel:
                try:
                    await text_channel.delete()
                except Forbidden:
                    await send_alert(text_channel.guild, t.could_not_delete_channel(text_channel.mention))
                    return

        async def delete_voice():
            channel.owner_id = None
            channel.owner_override = None
            await db.exec(delete(DynChannelMember).filter_by(channel_id=voice_channel.id))
            channel.members.clear()

            try:
                await voice_channel.delete()
            except Forbidden:
                await send_alert(voice_channel.guild, t.could_not_delete_channel(voice_channel.mention))
                return
            else:
                await db.delete(channel)

        async def create_new_channel() -> bool:
            # check if there is at least one empty channel
            if not all(
                any(not m.bot for m in c.members)
                for chnl in channel.group.channels
                if chnl.channel_id != channel.channel_id and (c := self.bot.get_channel(chnl.channel_id))
            ):
                return True

            guild: Guild = voice_channel.guild
            category: Union[CategoryChannel, Guild] = voice_channel.category or guild

            overwrites = voice_channel.overwrites
            if channel.locked:
                overwrites = remove_lock_overrides(
                    channel, voice_channel, overwrites, keep_members=False, reset_user_role=True
                )
            try:
                new_channel = await safe_create_voice_channel(
                    category, channel, await self.get_channel_name(guild), overwrites
                )
            except (Forbidden, HTTPException):
                await send_alert(guild, t.could_not_create_voice_channel)
                return False
            else:
                await DynChannel.create(new_channel.id, channel.group_id)
                return True

        await delete_text()
        if await create_new_channel():
            await delete_voice()

    async def on_voice_state_update(self, member: Member, before: VoiceState, after: VoiceState):
        if before.channel == after.channel:
            return

        async def delayed(delay, key, func, delay_callback, *args):
            await asyncio.sleep(delay)
            delay_callback()
            async with self._channel_lock[key]:
                async with db_context():
                    return await func(*args)

        async def create_task(delay, c, task_dict, cancel_dict, func):
            dyn_channel: Optional[DynChannel] = await DynChannel.get(channel_id=channel.id)
            if not dyn_channel:
                return

            await collect_links(member.guild, roles := set(), dyn_channel.group_id)
            if func == self.member_leave:
                await update_roles(member, remove=roles)
            else:
                await update_roles(member, add=roles)

            key = member, c
            if task := cancel_dict.pop(key, None):
                task.cancel()
            elif key not in task_dict:
                task_dict[key] = asyncio.create_task(
                    delayed(delay, dyn_channel.group_id, func, lambda: task_dict.pop(key, None), *key)
                )

        remove: set[Role] = set()
        add: set[Role] = set()

        if channel := before.channel:
            await collect_links(channel.guild, remove, str(channel.id))
            if (k := (member, channel)) in self._recent_kicks:
                self._recent_kicks.remove(k)
                await self.member_leave(member, channel)
            else:
                await create_task(5, channel, self._leave_tasks, self._join_tasks, self.member_leave)

        if channel := after.channel:
            await collect_links(channel.guild, add, str(channel.id))
            await create_task(1, channel, self._join_tasks, self._leave_tasks, self.member_join)

        await update_roles(member, add=add, remove=remove)

    @commands.group(aliases=["vc"])
    @guild_only()
    @docs(t.commands.voice)
    async def voice(self, ctx: Context):
        if ctx.invoked_subcommand is None:
            raise UserInputError

    @voice.group(name="dynamic", aliases=["dyn", "d"])
    @VoiceChannelPermission.dyn_read.check
    @docs(t.commands.voice_dynamic)
    async def voice_dynamic(self, ctx: Context):
        if len(ctx.message.content.lstrip(ctx.prefix).split()) > 2:
            if ctx.invoked_subcommand is None:
                raise UserInputError
            return

        embed = Embed(title=t.voice_channel, colour=Colors.Voice)

        group: DynGroup
        async for group in await db.stream(select(DynGroup, DynGroup.channels)):
            channels: list[tuple[str, VoiceChannel, Optional[TextChannel]]] = []
            for channel in group.channels:
                voice_channel: Optional[VoiceChannel] = ctx.guild.get_channel(channel.channel_id)
                text_channel: Optional[TextChannel] = ctx.guild.get_channel(channel.text_id)
                if not voice_channel:
                    await db.delete(channel)
                    continue

                if channel.locked:
                    if voice_channel.overwrites_for(voice_channel.guild.get_role(channel.group.user_role)).view_channel:
                        icon = "lock"
                    else:
                        icon = "man_detective"
                else:
                    icon = "unlock"

                channels.append((icon, voice_channel, text_channel))

            if not channels:
                await db.delete(group)
                continue

            embed.add_field(
                name=t.cnt_channels(cnt=len(channels)),
                value="\n".join(f":{icon}: {vc.mention} {txt.mention if txt else ''}" for icon, vc, txt in channels),
                inline=False,
            )

        if not embed.fields:
            embed.colour = Colors.error
            embed.description = t.no_dyn_group
        await send_long_embed(ctx, embed, paginate=True)

    @voice_dynamic.command(name="add", aliases=["a", "+"])
    @VoiceChannelPermission.dyn_write.check
    @docs(t.commands.voice_dynamic_add)
    async def voice_dynamic_add(self, ctx: Context, user_role: Optional[Role], *, voice_channel: VoiceChannel):
        everyone = voice_channel.guild.default_role
        user_role = user_role or everyone
        if not check_voice_permissions(voice_channel, user_role):
            raise CommandError(t.invalid_user_role(user_role.mention if user_role != everyone else "@everyone"))

        if await db.exists(filter_by(DynChannel, channel_id=voice_channel.id)):
            raise CommandError(t.dyn_group_already_exists)

        try:
            await voice_channel.edit(name=await self.get_channel_name(voice_channel.guild))
        except Forbidden:
            raise CommandError(t.cannot_edit)

        await DynGroup.create(voice_channel.id, user_role.id)
        embed = Embed(title=t.voice_channel, colour=Colors.Voice, description=t.dyn_group_created)
        await reply(ctx, embed=embed)
        await send_to_changelog(ctx.guild, t.log_dyn_group_created)

    @voice_dynamic.command(name="remove", aliases=["del", "d", "r", "-"])
    @VoiceChannelPermission.dyn_write.check
    @docs(t.commands.voice_dynamic_remove)
    async def voice_dynamic_remove(self, ctx: Context, *, voice_channel: VoiceChannel):
        channel: Optional[DynChannel] = await db.get(
            DynChannel, [DynChannel.group, DynGroup.channels], DynChannel.members, channel_id=voice_channel.id
        )
        if not channel:
            raise CommandError(t.dyn_group_not_found)

        for c in channel.group.channels:
            if (x := self.bot.get_channel(c.channel_id)) and c.channel_id != voice_channel.id:
                try:
                    await x.delete()
                except Forbidden:
                    raise CommandError(t.could_not_delete_channel(x.mention))
            if x := self.bot.get_channel(c.text_id):
                try:
                    await x.delete()
                except Forbidden:
                    raise CommandError(t.could_not_delete_channel(x.mention))

        await db.delete(channel.group)
        embed = Embed(title=t.voice_channel, colour=Colors.Voice, description=t.dyn_group_removed)
        await reply(ctx, embed=embed)
        await send_to_changelog(ctx.guild, t.log_dyn_group_removed)

    @voice.command(name="help", aliases=["commands", "c"])
    @docs(t.commands.help)
    async def voice_help(self, ctx: Context):
        message = await reply(ctx, embed=await get_commands_embed())

        if channel := await DynChannel.get(text_id=ctx.channel.id):
            await self.update_control_message(channel, message)

    @voice.command(name="info", aliases=["i"])
    @docs(t.commands.voice_info)
    async def voice_info(self, ctx: Context, *, voice_channel: Optional[Union[VoiceChannel, Member]] = None):
        if not voice_channel:
            if channel := await db.get(DynChannel, text_id=ctx.channel.id):
                voice_channel = self.bot.get_channel(channel.channel_id)

        if not isinstance(voice_channel, VoiceChannel):
            member = voice_channel or ctx.author
            if not member.voice:
                if not voice_channel:
                    raise CommandError(t.not_in_voice)
                if await self.is_teamler(ctx.author):
                    raise CommandError(t.user_not_in_voice)
                raise CommandError(tg.permission_denied)
            voice_channel = member.voice.channel

        channel: Optional[DynChannel] = await db.get(
            DynChannel, [DynChannel.group, DynGroup.channels], DynChannel.members, channel_id=voice_channel.id
        )
        if not channel:
            raise CommandError(t.dyn_group_not_found)

        if not voice_channel.permissions_for(ctx.author).connect:
            raise CommandError(tg.permission_denied)

        await self.send_voice_info(ctx, channel)

    async def send_voice_info(self, target: Messageable | InteractionResponse, channel: DynChannel):
        voice_channel: VoiceChannel = self.bot.get_channel(channel.channel_id)
        if channel.locked:
            if voice_channel.overwrites_for(voice_channel.guild.get_role(channel.group.user_role)).view_channel:
                state = t.state.locked
            else:
                state = t.state.hidden
        else:
            state = t.state.unlocked

        embed = Embed(title=t.voice_info, color=[Colors.unlocked, Colors.locked][channel.locked])
        embed.add_field(name=t.voice_name, value=voice_channel.name)
        embed.add_field(name=t.voice_state, value=state)

        if owner := await self.get_owner(channel):
            embed.add_field(name=t.voice_owner, value=owner.mention)

        out = []

        active = members = set(voice_channel.members)
        if channel.locked:
            members = {m for m in voice_channel.overwrites if isinstance(m, Member)}

        join_map = {m.member_id: m.timestamp.timestamp() for m in channel.members}
        members = sorted(members, key=lambda m: -1 if m.id == channel.owner_override else join_map.get(m.id, 1e1337))

        for member in members:
            if member in active:
                out.append(f":small_orange_diamond: {member.mention}")
            else:
                out.append(f":small_blue_diamond: {member.mention}")

        if channel.locked:
            name = t.voice_members.locked(len(active), cnt=len(members))
        else:
            name = t.voice_members.unlocked(cnt=len(members))
        embed.add_field(name=name, value="\n".join(out), inline=False)

        messages = await send_long_embed(target, embed, paginate=True)
        if isinstance(target, InteractionResponse):
            return
        if channel := await DynChannel.get(text_id=channel.text_id):
            await self.update_control_message(channel, messages[-1])

    @voice.command(name="rename")
    @optional_permissions(VoiceChannelPermission.dyn_rename, VoiceChannelPermission.override_owner)
    @docs(t.commands.voice_rename)
    async def voice_rename(self, ctx: Context, *, name: Optional[str]):
        channel, voice_channel = await self.get_channel(ctx.author, check_owner=True)
        text_channel: TextChannel = self.get_text_channel(channel)
        old_name = voice_channel.name

        if not name:
            name = await self.get_channel_name(ctx.guild)
        elif name.lower() not in self.allowed_names:
            if not await VoiceChannelPermission.dyn_rename.check_permissions(ctx.author):
                raise CommandError(t.no_custom_name)

        if any(c.id != voice_channel.id and name == c.name for c in voice_channel.guild.voice_channels):
            if not await Confirmation().run(ctx, t.rename_description):
                return

        try:
            await rename_channel(voice_channel, name)
            await rename_channel(text_channel, name)
        except Forbidden:
            raise CommandError(t.cannot_edit)
        except HTTPException:
            raise CommandError(t.rename_failed)

        await self.send_voice_msg(channel, t.voice_channel, t.renamed(ctx.author.mention, old_name, name))
        await ctx.message.add_reaction(name_to_emoji["white_check_mark"])

    @voice.command(name="owner", aliases=["o"])
    @optional_permissions(VoiceChannelPermission.override_owner)
    @docs(t.commands.voice_owner)
    async def voice_owner(self, ctx: Context, member: Member):
        channel, voice_channel = await self.get_channel(ctx.author, check_owner=True)

        if member not in voice_channel.members:
            raise CommandError(t.user_not_in_this_channel)
        if member.bot:
            raise CommandError(t.bot_no_owner_transfer)

        if await self.get_owner(channel) == member:
            raise CommandError(t.already_owner(member.mention))

        channel.owner_override = member.id
        await self.update_owner(channel, member)
        await ctx.message.add_reaction(name_to_emoji["white_check_mark"])

    @voice.command(name="lock", aliases=["l"])
    @optional_permissions(VoiceChannelPermission.override_owner)
    @docs(t.commands.voice_lock)
    async def voice_lock(self, ctx: Context):
        channel, voice_channel = await self.get_channel(ctx.author, check_owner=True)
        if channel.locked:
            raise CommandError(t.already_locked)

        await self.lock_channel(ctx.author, channel, voice_channel, hide=False)
        await ctx.message.add_reaction(name_to_emoji["white_check_mark"])

    @voice.command(name="hide", aliases=["h"])
    @optional_permissions(VoiceChannelPermission.override_owner)
    @docs(t.commands.voice_hide)
    async def voice_hide(self, ctx: Context):
        channel, voice_channel = await self.get_channel(ctx.author, check_owner=True)
        user_role = voice_channel.guild.get_role(channel.group.user_role)
        locked = channel.locked
        if locked and not voice_channel.overwrites_for(user_role).view_channel:
            raise CommandError(t.already_hidden)

        await self.lock_channel(ctx.author, channel, voice_channel, hide=True)
        await ctx.message.add_reaction(name_to_emoji["white_check_mark"])

    @voice.command(name="show", aliases=["s", "unhide"])
    @optional_permissions(VoiceChannelPermission.override_owner)
    @docs(t.commands.voice_show)
    async def voice_show(self, ctx: Context):
        channel, voice_channel = await self.get_channel(ctx.author, check_owner=True)
        user_role = voice_channel.guild.get_role(channel.group.user_role)
        if not channel.locked or voice_channel.overwrites_for(user_role).view_channel:
            raise CommandError(t.not_hidden)

        await self.unhide_channel(ctx.author, channel, voice_channel)
        await ctx.message.add_reaction(name_to_emoji["white_check_mark"])

    @voice.command(name="unlock", aliases=["u"])
    @optional_permissions(VoiceChannelPermission.override_owner)
    @docs(t.commands.voice_unlock)
    async def voice_unlock(self, ctx: Context):
        channel, voice_channel = await self.get_channel(ctx.author, check_owner=True)
        if not channel.locked:
            raise CommandError(t.already_unlocked)

        await self.unlock_channel(ctx.author, channel, voice_channel)
        await ctx.message.add_reaction(name_to_emoji["white_check_mark"])

    @voice.command(name="add", aliases=["a", "+", "invite"])
    @optional_permissions(VoiceChannelPermission.override_owner)
    @docs(t.commands.voice_add)
    async def voice_add(self, ctx: Context, *members: Greedy[Member]):
        if not members:
            raise UserInputError

        channel, voice_channel = await self.get_channel(ctx.author, check_owner=True, check_locked=True)

        if self.bot.user in members:
            raise CommandError(t.cannot_add_user(self.bot.user.mention))

        for member in set(members):
            await self.add_to_channel(channel, voice_channel, member)

        await ctx.message.add_reaction(name_to_emoji["white_check_mark"])

    @voice.command(name="remove", aliases=["r", "-", "kick", "k"])
    @optional_permissions(VoiceChannelPermission.override_owner)
    @docs(t.commands.voice_remove)
    async def voice_remove(self, ctx: Context, *members: Greedy[Member]):
        if not members:
            raise UserInputError

        channel, voice_channel = await self.get_channel(ctx.author, check_owner=True, check_locked=True)

        members = set(members)
        if self.bot.user in members:
            raise CommandError(t.cannot_remove_user(self.bot.user.mention))
        if ctx.author in members:
            raise CommandError(t.cannot_remove_user(ctx.author.mention))

        team_roles: list[Role] = [
            team_role
            for role_name in self.team_roles
            if (team_role := ctx.guild.get_role(await RoleSettings.get(role_name)))
            if check_voice_permissions(voice_channel, team_role)
        ]
        for member in members:
            if member not in voice_channel.overwrites:
                raise CommandError(t.not_added(member.mention))
            if any(role in member.roles for role in team_roles):
                raise CommandError(t.cannot_remove_user(member.mention))

        for member in members:
            await self.remove_from_channel(channel, voice_channel, member)

        await ctx.message.add_reaction(name_to_emoji["white_check_mark"])

    @voice.group(name="role_links", aliases=["rl"])
    @VoiceChannelPermission.link_read.check
    @docs(t.commands.voice_link)
    async def voice_link(self, ctx: Context):
        if len(ctx.message.content.lstrip(ctx.prefix).split()) > 2:
            if ctx.invoked_subcommand is None:
                raise UserInputError
            return

        guild: Guild = ctx.guild

        out: list[tuple[VoiceChannel, Role]] = []
        link: RoleVoiceLink
        async for link in await db.stream(select(RoleVoiceLink)):
            role: Optional[Role] = guild.get_role(link.role)
            if role is None:
                await db.delete(link)
                continue

            if link.voice_channel.isnumeric():
                voice: Optional[VoiceChannel] = guild.get_channel(int(link.voice_channel))
                if not voice:
                    await db.delete(link)
                    continue
                out.append((voice, role))
            else:
                group: Optional[DynGroup] = await db.get(DynGroup, DynGroup.channels, id=link.voice_channel)
                if not group:
                    await db.delete(link)
                    continue

                for channel in group.channels:
                    if voice := guild.get_channel(channel.channel_id):
                        out.append((voice, role))

        embed = Embed(title=t.voice_channel, color=Colors.Voice)
        embed.description = "\n".join(
            f"{voice.mention} (`{voice.id}`) -> {role.mention} (`{role.id}`)" for voice, role in out
        )

        if not out:
            embed.colour = Colors.error
            embed.description = t.no_links_created

        await send_long_embed(ctx, embed)

    def gather_members(self, channel: Optional[DynChannel], voice_channel: VoiceChannel) -> set[Member]:
        members: set[Member] = set(voice_channel.members)
        if not channel:
            return members

        for dyn_channel in channel.group.channels:
            if x := self.bot.get_channel(dyn_channel.channel_id):
                members.update(x.members)

        return members

    @voice_link.command(name="add", aliases=["a", "+"])
    @VoiceChannelPermission.link_write.check
    @docs(t.commands.voice_link_add)
    async def voice_link_add(self, ctx: Context, voice_channel: VoiceChannel, *, role: Role):
        if channel := await DynChannel.get(channel_id=voice_channel.id):
            voice_id = channel.group_id
        else:
            voice_id = str(voice_channel.id)

        if await db.exists(filter_by(RoleVoiceLink, role=role.id, voice_channel=voice_id)):
            raise CommandError(t.link_already_exists)

        check_role_assignable(role)

        await RoleVoiceLink.create(role.id, voice_id)

        for m in self.gather_members(channel, voice_channel):
            asyncio.create_task(update_roles(m, add={role}))

        embed = Embed(title=t.voice_channel, colour=Colors.Voice, description=t.link_created(voice_channel, role.id))
        await reply(ctx, embed=embed)
        await send_to_changelog(ctx.guild, t.log_link_created(voice_channel, role))

    @voice_link.command(name="remove", aliases=["del", "r", "d", "-"])
    @VoiceChannelPermission.link_write.check
    @docs(t.commands.voice_link_remove)
    async def voice_link_remove(self, ctx: Context, voice_channel: VoiceChannel, *, role: Role):
        if channel := await DynChannel.get(channel_id=voice_channel.id):
            voice_id = channel.group_id
        else:
            voice_id = str(voice_channel.id)

        link: Optional[RoleVoiceLink] = await db.get(RoleVoiceLink, role=role.id, voice_channel=voice_id)
        if not link:
            raise CommandError(t.link_not_found)

        await db.delete(link)

        for m in self.gather_members(channel, voice_channel):
            asyncio.create_task(update_roles(m, remove={role}))

        embed = Embed(title=t.voice_channel, colour=Colors.Voice, description=t.link_deleted)
        await reply(ctx, embed=embed)
        await send_to_changelog(ctx.guild, t.log_link_deleted(voice_channel, role))