leandrotoledo/python-telegram-bot

View on GitHub
telegram/ext/_updater.py

Summary

Maintainability
C
1 day
Test Coverage
#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015-2024
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program.  If not, see [http://www.gnu.org/licenses/].
"""This module contains the class Updater, which tries to make creating Telegram bots intuitive."""
import asyncio
import contextlib
import ssl
from pathlib import Path
from types import TracebackType
from typing import (
    TYPE_CHECKING,
    Any,
    AsyncContextManager,
    Callable,
    Coroutine,
    List,
    Optional,
    Type,
    TypeVar,
    Union,
)

from telegram._utils.defaultvalue import DEFAULT_80, DEFAULT_IP, DEFAULT_NONE, DefaultValue
from telegram._utils.logging import get_logger
from telegram._utils.repr import build_repr_with_selected_attrs
from telegram._utils.types import DVType, ODVInput
from telegram.error import InvalidToken, RetryAfter, TelegramError, TimedOut

try:
    from telegram.ext._utils.webhookhandler import WebhookAppClass, WebhookServer

    WEBHOOKS_AVAILABLE = True
except ImportError:
    WEBHOOKS_AVAILABLE = False

if TYPE_CHECKING:
    from socket import socket

    from telegram import Bot


_UpdaterType = TypeVar("_UpdaterType", bound="Updater")  # pylint: disable=invalid-name
_LOGGER = get_logger(__name__)


class Updater(AsyncContextManager["Updater"]):
    """This class fetches updates for the bot either via long polling or by starting a webhook
    server. Received updates are enqueued into the :attr:`update_queue` and may be fetched from
    there to handle them appropriately.

    Instances of this class can be used as asyncio context managers, where

    .. code:: python

        async with updater:
            # code

    is roughly equivalent to

    .. code:: python

        try:
            await updater.initialize()
            # code
        finally:
            await updater.shutdown()

    .. seealso:: :meth:`__aenter__` and :meth:`__aexit__`.

    .. seealso:: :wiki:`Architecture Overview <Architecture>`,
        :wiki:`Builder Pattern <Builder-Pattern>`

    .. versionchanged:: 20.0

        * Removed argument and attribute ``user_sig_handler``
        * The only arguments and attributes are now :attr:`bot` and :attr:`update_queue` as now
          the sole purpose of this class is to fetch updates. The entry point to a PTB application
          is now :class:`telegram.ext.Application`.

    Args:
        bot (:class:`telegram.Bot`): The bot used with this Updater.
        update_queue (:class:`asyncio.Queue`): Queue for the updates.

    Attributes:
        bot (:class:`telegram.Bot`): The bot used with this Updater.
        update_queue (:class:`asyncio.Queue`): Queue for the updates.

    """

    __slots__ = (
        "__lock",
        "__polling_cleanup_cb",
        "__polling_task",
        "__polling_task_stop_event",
        "_httpd",
        "_initialized",
        "_last_update_id",
        "_running",
        "bot",
        "update_queue",
    )

    def __init__(
        self,
        bot: "Bot",
        update_queue: "asyncio.Queue[object]",
    ):
        self.bot: Bot = bot
        self.update_queue: asyncio.Queue[object] = update_queue

        self._last_update_id = 0
        self._running = False
        self._initialized = False
        self._httpd: Optional[WebhookServer] = None
        self.__lock = asyncio.Lock()
        self.__polling_task: Optional[asyncio.Task] = None
        self.__polling_task_stop_event: asyncio.Event = asyncio.Event()
        self.__polling_cleanup_cb: Optional[Callable[[], Coroutine[Any, Any, None]]] = None

    async def __aenter__(self: _UpdaterType) -> _UpdaterType:  # noqa: PYI019
        """
        |async_context_manager| :meth:`initializes <initialize>` the Updater.

        Returns:
            The initialized Updater instance.

        Raises:
            :exc:`Exception`: If an exception is raised during initialization, :meth:`shutdown`
                is called in this case.
        """
        try:
            await self.initialize()
            return self
        except Exception as exc:
            await self.shutdown()
            raise exc

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> None:
        """|async_context_manager| :meth:`shuts down <shutdown>` the Updater."""
        # Make sure not to return `True` so that exceptions are not suppressed
        # https://docs.python.org/3/reference/datamodel.html?#object.__aexit__
        await self.shutdown()

    def __repr__(self) -> str:
        """Give a string representation of the updater in the form ``Updater[bot=...]``.

        As this class doesn't implement :meth:`object.__str__`, the default implementation
        will be used, which is equivalent to :meth:`__repr__`.

        Returns:
            :obj:`str`
        """
        return build_repr_with_selected_attrs(self, bot=self.bot)

    @property
    def running(self) -> bool:
        return self._running

    async def initialize(self) -> None:
        """Initializes the Updater & the associated :attr:`bot` by calling
        :meth:`telegram.Bot.initialize`.

        .. seealso::
            :meth:`shutdown`
        """
        if self._initialized:
            _LOGGER.debug("This Updater is already initialized.")
            return

        await self.bot.initialize()
        self._initialized = True

    async def shutdown(self) -> None:
        """
        Shutdown the Updater & the associated :attr:`bot` by calling :meth:`telegram.Bot.shutdown`.

        .. seealso::
            :meth:`initialize`

        Raises:
            :exc:`RuntimeError`: If the updater is still running.
        """
        if self.running:
            raise RuntimeError("This Updater is still running!")

        if not self._initialized:
            _LOGGER.debug("This Updater is already shut down. Returning.")
            return

        await self.bot.shutdown()
        self._initialized = False
        _LOGGER.debug("Shut down of Updater complete")

    async def start_polling(
        self,
        poll_interval: float = 0.0,
        timeout: int = 10,
        bootstrap_retries: int = -1,
        read_timeout: ODVInput[float] = DEFAULT_NONE,
        write_timeout: ODVInput[float] = DEFAULT_NONE,
        connect_timeout: ODVInput[float] = DEFAULT_NONE,
        pool_timeout: ODVInput[float] = DEFAULT_NONE,
        allowed_updates: Optional[List[str]] = None,
        drop_pending_updates: Optional[bool] = None,
        error_callback: Optional[Callable[[TelegramError], None]] = None,
    ) -> "asyncio.Queue[object]":
        """Starts polling updates from Telegram.

        .. versionchanged:: 20.0
            Removed the ``clean`` argument in favor of :paramref:`drop_pending_updates`.

        Args:
            poll_interval (:obj:`float`, optional): Time to wait between polling updates from
                Telegram in seconds. Default is ``0.0``.
            timeout (:obj:`int`, optional): Passed to
                :paramref:`telegram.Bot.get_updates.timeout`. Defaults to ``10`` seconds.
            bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the
                :class:`telegram.ext.Updater` will retry on failures on the Telegram server.

                * < 0 - retry indefinitely (default)
                *   0 - no retries
                * > 0 - retry up to X times
            read_timeout (:obj:`float`, optional): Value to pass to
                :paramref:`telegram.Bot.get_updates.read_timeout`. Defaults to
                :attr:`~telegram.request.BaseRequest.DEFAULT_NONE`.

                .. versionchanged:: 20.7
                    Defaults to :attr:`~telegram.request.BaseRequest.DEFAULT_NONE` instead of
                    ``2``.
                .. deprecated:: 20.7
                    Deprecated in favor of setting the timeout via
                    :meth:`telegram.ext.ApplicationBuilder.get_updates_read_timeout` or
                    :paramref:`telegram.Bot.get_updates_request`.
            write_timeout (:obj:`float` | :obj:`None`, optional): Value to pass to
                :paramref:`telegram.Bot.get_updates.write_timeout`. Defaults to
                :attr:`~telegram.request.BaseRequest.DEFAULT_NONE`.

                .. deprecated:: 20.7
                    Deprecated in favor of setting the timeout via
                    :meth:`telegram.ext.ApplicationBuilder.get_updates_write_timeout` or
                    :paramref:`telegram.Bot.get_updates_request`.
            connect_timeout (:obj:`float` | :obj:`None`, optional): Value to pass to
                :paramref:`telegram.Bot.get_updates.connect_timeout`. Defaults to
                :attr:`~telegram.request.BaseRequest.DEFAULT_NONE`.

                .. deprecated:: 20.7
                    Deprecated in favor of setting the timeout via
                    :meth:`telegram.ext.ApplicationBuilder.get_updates_connect_timeout` or
                    :paramref:`telegram.Bot.get_updates_request`.
            pool_timeout (:obj:`float` | :obj:`None`, optional): Value to pass to
                :paramref:`telegram.Bot.get_updates.pool_timeout`. Defaults to
                :attr:`~telegram.request.BaseRequest.DEFAULT_NONE`.

                .. deprecated:: 20.7
                    Deprecated in favor of setting the timeout via
                    :meth:`telegram.ext.ApplicationBuilder.get_updates_pool_timeout` or
                    :paramref:`telegram.Bot.get_updates_request`.
            allowed_updates (List[:obj:`str`], optional): Passed to
                :meth:`telegram.Bot.get_updates`.
            drop_pending_updates (:obj:`bool`, optional): Whether to clean any pending updates on
                Telegram servers before actually starting to poll. Default is :obj:`False`.

                .. versionadded :: 13.4
            error_callback (Callable[[:exc:`telegram.error.TelegramError`], :obj:`None`], \
                optional): Callback to handle :exc:`telegram.error.TelegramError` s that occur
                while calling :meth:`telegram.Bot.get_updates` during polling. Defaults to
                :obj:`None`, in which case errors will be logged. Callback signature::

                    def callback(error: telegram.error.TelegramError)

                Note:
                    The :paramref:`error_callback` must *not* be a :term:`coroutine function`! If
                    asynchronous behavior of the callback is wanted, please schedule a task from
                    within the callback.

        Returns:
            :class:`asyncio.Queue`: The update queue that can be filled from the main thread.

        Raises:
            :exc:`RuntimeError`: If the updater is already running or was not initialized.

        """
        # We refrain from issuing deprecation warnings for the timeout parameters here, as we
        # already issue them in `Application`. This means that there are no warnings when using
        # `Updater` without `Application`, but this is a rather special use case.

        if error_callback and asyncio.iscoroutinefunction(error_callback):
            raise TypeError(
                "The `error_callback` must not be a coroutine function! Use an ordinary function "
                "instead. "
            )

        async with self.__lock:
            if self.running:
                raise RuntimeError("This Updater is already running!")
            if not self._initialized:
                raise RuntimeError("This Updater was not initialized via `Updater.initialize`!")

            self._running = True

            try:
                # Create & start tasks
                polling_ready = asyncio.Event()

                await self._start_polling(
                    poll_interval=poll_interval,
                    timeout=timeout,
                    read_timeout=read_timeout,
                    write_timeout=write_timeout,
                    connect_timeout=connect_timeout,
                    pool_timeout=pool_timeout,
                    bootstrap_retries=bootstrap_retries,
                    drop_pending_updates=drop_pending_updates,
                    allowed_updates=allowed_updates,
                    ready=polling_ready,
                    error_callback=error_callback,
                )

                _LOGGER.debug("Waiting for polling to start")
                await polling_ready.wait()
                _LOGGER.debug("Polling updates from Telegram started")

                return self.update_queue
            except Exception as exc:
                self._running = False
                raise exc

    async def _start_polling(
        self,
        poll_interval: float,
        timeout: int,
        read_timeout: ODVInput[float],
        write_timeout: ODVInput[float],
        connect_timeout: ODVInput[float],
        pool_timeout: ODVInput[float],
        bootstrap_retries: int,
        drop_pending_updates: Optional[bool],
        allowed_updates: Optional[List[str]],
        ready: asyncio.Event,
        error_callback: Optional[Callable[[TelegramError], None]],
    ) -> None:
        _LOGGER.debug("Updater started (polling)")

        # the bootstrapping phase does two things:
        # 1) make sure there is no webhook set
        # 2) apply drop_pending_updates
        await self._bootstrap(
            bootstrap_retries,
            drop_pending_updates=drop_pending_updates,
            webhook_url="",
            allowed_updates=None,
        )

        _LOGGER.debug("Bootstrap done")

        async def polling_action_cb() -> bool:
            try:
                updates = await self.bot.get_updates(
                    offset=self._last_update_id,
                    timeout=timeout,
                    read_timeout=read_timeout,
                    connect_timeout=connect_timeout,
                    write_timeout=write_timeout,
                    pool_timeout=pool_timeout,
                    allowed_updates=allowed_updates,
                )
            except TelegramError as exc:
                # TelegramErrors should be processed by the network retry loop
                raise exc
            except Exception as exc:
                # Other exceptions should not. Let's log them for now.
                _LOGGER.critical(
                    "Something went wrong processing the data received from Telegram. "
                    "Received data was *not* processed!",
                    exc_info=exc,
                )
                return True

            if updates:
                if not self.running:
                    _LOGGER.critical(
                        "Updater stopped unexpectedly. Pulled updates will be ignored and pulled "
                        "again on restart."
                    )
                else:
                    for update in updates:
                        await self.update_queue.put(update)
                    self._last_update_id = updates[-1].update_id + 1  # Add one to 'confirm' it

            return True  # Keep fetching updates & don't quit. Polls with poll_interval.

        def default_error_callback(exc: TelegramError) -> None:
            _LOGGER.exception("Exception happened while polling for updates.", exc_info=exc)

        # Start task that runs in background, pulls
        # updates from Telegram and inserts them in the update queue of the
        # Application.
        self.__polling_task = asyncio.create_task(
            self._network_loop_retry(
                action_cb=polling_action_cb,
                on_err_cb=error_callback or default_error_callback,
                description="getting Updates",
                interval=poll_interval,
                stop_event=self.__polling_task_stop_event,
            ),
            name="Updater:start_polling:polling_task",
        )

        # Prepare a cleanup callback to await on _stop_polling
        # Calling get_updates one more time with the latest `offset` parameter ensures that
        # all updates that where put into the update queue are also marked as "read" to TG,
        # so we do not receive them again on the next startup
        # We define this here so that we can use the same parameters as in the polling task
        async def _get_updates_cleanup() -> None:
            _LOGGER.debug(
                "Calling `get_updates` one more time to mark all fetched updates as read."
            )
            try:
                await self.bot.get_updates(
                    offset=self._last_update_id,
                    # We don't want to do long polling here!
                    timeout=0,
                    read_timeout=read_timeout,
                    connect_timeout=connect_timeout,
                    write_timeout=write_timeout,
                    pool_timeout=pool_timeout,
                    allowed_updates=allowed_updates,
                )
            except TelegramError as exc:
                _LOGGER.error(
                    "Error while calling `get_updates` one more time to mark all fetched updates "
                    "as read: %s. Suppressing error to ensure graceful shutdown. When polling for "
                    "updates is restarted, updates may be fetched again. Please adjust timeouts "
                    "via `ApplicationBuilder` or the parameter `get_updates_request` of `Bot`.",
                    exc_info=exc,
                )

        self.__polling_cleanup_cb = _get_updates_cleanup

        if ready is not None:
            ready.set()

    async def start_webhook(
        self,
        listen: DVType[str] = DEFAULT_IP,
        port: DVType[int] = DEFAULT_80,
        url_path: str = "",
        cert: Optional[Union[str, Path]] = None,
        key: Optional[Union[str, Path]] = None,
        bootstrap_retries: int = 0,
        webhook_url: Optional[str] = None,
        allowed_updates: Optional[List[str]] = None,
        drop_pending_updates: Optional[bool] = None,
        ip_address: Optional[str] = None,
        max_connections: int = 40,
        secret_token: Optional[str] = None,
        unix: Optional[Union[str, Path, "socket"]] = None,
    ) -> "asyncio.Queue[object]":
        """
        Starts a small http server to listen for updates via webhook. If :paramref:`cert`
        and :paramref:`key` are not provided, the webhook will be started directly on
        ``http://listen:port/url_path``, so SSL can be handled by another
        application. Else, the webhook will be started on
        ``https://listen:port/url_path``. Also calls :meth:`telegram.Bot.set_webhook` as required.

        Important:
            If you want to use this method, you must install PTB with the optional requirement
            ``webhooks``, i.e.

            .. code-block:: bash

               pip install "python-telegram-bot[webhooks]"

        .. seealso:: :wiki:`Webhooks`

        .. versionchanged:: 13.4
            :meth:`start_webhook` now *always* calls :meth:`telegram.Bot.set_webhook`, so pass
            ``webhook_url`` instead of calling ``updater.bot.set_webhook(webhook_url)`` manually.
        .. versionchanged:: 20.0

            * Removed the ``clean`` argument in favor of :paramref:`drop_pending_updates` and
              removed the deprecated argument ``force_event_loop``.

        Args:
            listen (:obj:`str`, optional): IP-Address to listen on. Defaults to
                `127.0.0.1 <https://en.wikipedia.org/wiki/Localhost>`_.
            port (:obj:`int`, optional): Port the bot should be listening on. Must be one of
                :attr:`telegram.constants.SUPPORTED_WEBHOOK_PORTS` unless the bot is running
                behind a proxy. Defaults to ``80``.
            url_path (:obj:`str`, optional): Path inside url (http(s)://listen:port/<url_path>).
                Defaults to ``''``.
            cert (:class:`pathlib.Path` | :obj:`str`, optional): Path to the SSL certificate file.
            key (:class:`pathlib.Path` | :obj:`str`, optional): Path to the SSL key file.
            drop_pending_updates (:obj:`bool`, optional): Whether to clean any pending updates on
                Telegram servers before actually starting to poll. Default is :obj:`False`.

                .. versionadded :: 13.4
            bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the
                :class:`telegram.ext.Updater` will retry on failures on the Telegram server.

                * < 0 - retry indefinitely
                *   0 - no retries (default)
                * > 0 - retry up to X times
            webhook_url (:obj:`str`, optional): Explicitly specify the webhook url. Useful behind
                NAT, reverse proxy, etc. Default is derived from :paramref:`listen`,
                :paramref:`port`, :paramref:`url_path`, :paramref:`cert`, and :paramref:`key`.
            ip_address (:obj:`str`, optional): Passed to :meth:`telegram.Bot.set_webhook`.
                Defaults to :obj:`None`.

                .. versionadded :: 13.4
            allowed_updates (List[:obj:`str`], optional): Passed to
                :meth:`telegram.Bot.set_webhook`. Defaults to :obj:`None`.
            max_connections (:obj:`int`, optional): Passed to
                :meth:`telegram.Bot.set_webhook`. Defaults to ``40``.

                .. versionadded:: 13.6
            secret_token (:obj:`str`, optional): Passed to :meth:`telegram.Bot.set_webhook`.
                Defaults to :obj:`None`.

                When added, the web server started by this call will expect the token to be set in
                the ``X-Telegram-Bot-Api-Secret-Token`` header of an incoming request and will
                raise a :class:`http.HTTPStatus.FORBIDDEN <http.HTTPStatus>` error if either the
                header isn't set or it is set to a wrong token.

                .. versionadded:: 20.0
            unix (:class:`pathlib.Path` | :obj:`str` | :class:`socket.socket`, optional): Can be
                either:

                * the path to the unix socket file as :class:`pathlib.Path` or :obj:`str`. This
                  will be passed to `tornado.netutil.bind_unix_socket <https://www.tornadoweb.org/
                  en/stable/netutil.html#tornado.netutil.bind_unix_socket>`_ to create the socket.
                  If the Path does not exist, the file will be created.

                * or the socket itself. This option allows you to e.g. restrict the permissions of
                  the socket for improved security. Note that you need to pass the correct family,
                  type and socket options yourself.

                Caution:
                    This parameter is a replacement for the default TCP bind. Therefore, it is
                    mutually exclusive with :paramref:`listen` and :paramref:`port`. When using
                    this param, you must also run a reverse proxy to the unix socket and set the
                    appropriate :paramref:`webhook_url`.

                .. versionadded:: 20.8
                .. versionchanged:: 21.1
                    Added support to pass a socket instance itself.
        Returns:
            :class:`queue.Queue`: The update queue that can be filled from the main thread.

        Raises:
            :exc:`RuntimeError`: If the updater is already running or was not initialized.
        """
        if not WEBHOOKS_AVAILABLE:
            raise RuntimeError(
                "To use `start_webhook`, PTB must be installed via `pip install "
                '"python-telegram-bot[webhooks]"`.'
            )
        # unix has special requirements what must and mustn't be set when using it
        if unix:
            error_msg = (
                "You can not pass unix and {0}, only use one. Unix if you want to "
                "initialize a unix socket, or {0} for a standard TCP server."
            )
            if not isinstance(listen, DefaultValue):
                raise RuntimeError(error_msg.format("listen"))
            if not isinstance(port, DefaultValue):
                raise RuntimeError(error_msg.format("port"))
            if not webhook_url:
                raise RuntimeError(
                    "Since you set unix, you also need to set the URL to the webhook "
                    "of the proxy you run in front of the unix socket."
                )

        async with self.__lock:
            if self.running:
                raise RuntimeError("This Updater is already running!")
            if not self._initialized:
                raise RuntimeError("This Updater was not initialized via `Updater.initialize`!")

            self._running = True

            try:
                # Create & start tasks
                webhook_ready = asyncio.Event()

                await self._start_webhook(
                    listen=DefaultValue.get_value(listen),
                    port=DefaultValue.get_value(port),
                    url_path=url_path,
                    cert=cert,
                    key=key,
                    bootstrap_retries=bootstrap_retries,
                    drop_pending_updates=drop_pending_updates,
                    webhook_url=webhook_url,
                    allowed_updates=allowed_updates,
                    ready=webhook_ready,
                    ip_address=ip_address,
                    max_connections=max_connections,
                    secret_token=secret_token,
                    unix=unix,
                )

                _LOGGER.debug("Waiting for webhook server to start")
                await webhook_ready.wait()
                _LOGGER.debug("Webhook server started")
            except Exception as exc:
                self._running = False
                raise exc

            # Return the update queue so the main thread can insert updates
            return self.update_queue

    async def _start_webhook(
        self,
        listen: str,
        port: int,
        url_path: str,
        bootstrap_retries: int,
        allowed_updates: Optional[List[str]],
        cert: Optional[Union[str, Path]] = None,
        key: Optional[Union[str, Path]] = None,
        drop_pending_updates: Optional[bool] = None,
        webhook_url: Optional[str] = None,
        ready: Optional[asyncio.Event] = None,
        ip_address: Optional[str] = None,
        max_connections: int = 40,
        secret_token: Optional[str] = None,
        unix: Optional[Union[str, Path, "socket"]] = None,
    ) -> None:
        _LOGGER.debug("Updater thread started (webhook)")

        if not url_path.startswith("/"):
            url_path = f"/{url_path}"

        # Create Tornado app instance
        app = WebhookAppClass(url_path, self.bot, self.update_queue, secret_token)

        # Form SSL Context
        # An SSLError is raised if the private key does not match with the certificate
        # Note that we only use the SSL certificate for the WebhookServer, if the key is also
        # present. This is because the WebhookServer may not actually be in charge of performing
        # the SSL handshake, e.g. in case a reverse proxy is used
        if cert is not None and key is not None:
            try:
                ssl_ctx: Optional[ssl.SSLContext] = ssl.create_default_context(
                    ssl.Purpose.CLIENT_AUTH
                )
                ssl_ctx.load_cert_chain(cert, key)  # type: ignore[union-attr]
            except ssl.SSLError as exc:
                raise TelegramError("Invalid SSL Certificate") from exc
        else:
            ssl_ctx = None
        # Create and start server
        self._httpd = WebhookServer(listen, port, app, ssl_ctx, unix)

        if not webhook_url:
            webhook_url = self._gen_webhook_url(
                protocol="https" if ssl_ctx else "http",
                listen=DefaultValue.get_value(listen),
                port=port,
                url_path=url_path,
            )

        # We pass along the cert to the webhook if present.
        await self._bootstrap(
            # Passing a Path or string only works if the bot is running against a local bot API
            # server, so let's read the contents
            cert=Path(cert).read_bytes() if cert else None,
            max_retries=bootstrap_retries,
            drop_pending_updates=drop_pending_updates,
            webhook_url=webhook_url,
            allowed_updates=allowed_updates,
            ip_address=ip_address,
            max_connections=max_connections,
            secret_token=secret_token,
        )

        await self._httpd.serve_forever(ready=ready)

    @staticmethod
    def _gen_webhook_url(protocol: str, listen: str, port: int, url_path: str) -> str:
        # TODO: double check if this should be https in any case - the docs of start_webhook
        # say differently!
        return f"{protocol}://{listen}:{port}{url_path}"

    async def _network_loop_retry(
        self,
        action_cb: Callable[..., Coroutine],
        on_err_cb: Callable[[TelegramError], None],
        description: str,
        interval: float,
        stop_event: Optional[asyncio.Event],
    ) -> None:
        """Perform a loop calling `action_cb`, retrying after network errors.

        Stop condition for loop: `self.running` evaluates :obj:`False` or return value of
        `action_cb` evaluates :obj:`False`.

        Args:
            action_cb (:term:`coroutine function`): Network oriented callback function to call.
            on_err_cb (:obj:`callable`): Callback to call when TelegramError is caught. Receives
                the exception object as a parameter.
            description (:obj:`str`): Description text to use for logs and exception raised.
            interval (:obj:`float` | :obj:`int`): Interval to sleep between each call to
                `action_cb`.
            stop_event (:class:`asyncio.Event` | :obj:`None`): Event to wait on for stopping the
                loop. Setting the event will make the loop exit even if `action_cb` is currently
                running.

        """

        async def do_action() -> bool:
            if not stop_event:
                return await action_cb()

            action_cb_task = asyncio.create_task(action_cb())
            stop_task = asyncio.create_task(stop_event.wait())
            done, pending = await asyncio.wait(
                (action_cb_task, stop_task), return_when=asyncio.FIRST_COMPLETED
            )
            with contextlib.suppress(asyncio.CancelledError):
                for task in pending:
                    task.cancel()

            if stop_task in done:
                _LOGGER.debug("Network loop retry %s was cancelled", description)
                return False

            return action_cb_task.result()

        _LOGGER.debug("Start network loop retry %s", description)
        cur_interval = interval
        while self.running:
            try:
                if not await do_action():
                    break
            except RetryAfter as exc:
                _LOGGER.info("%s", exc)
                cur_interval = 0.5 + exc.retry_after
            except TimedOut as toe:
                _LOGGER.debug("Timed out %s: %s", description, toe)
                # If failure is due to timeout, we should retry asap.
                cur_interval = 0
            except InvalidToken as pex:
                _LOGGER.error("Invalid token; aborting")
                raise pex
            except TelegramError as telegram_exc:
                _LOGGER.error("Error while %s: %s", description, telegram_exc)
                on_err_cb(telegram_exc)

                # increase waiting times on subsequent errors up to 30secs
                cur_interval = 1 if cur_interval == 0 else min(30, 1.5 * cur_interval)
            else:
                cur_interval = interval

            if cur_interval:
                await asyncio.sleep(cur_interval)

    async def _bootstrap(
        self,
        max_retries: int,
        webhook_url: Optional[str],
        allowed_updates: Optional[List[str]],
        drop_pending_updates: Optional[bool] = None,
        cert: Optional[bytes] = None,
        bootstrap_interval: float = 1,
        ip_address: Optional[str] = None,
        max_connections: int = 40,
        secret_token: Optional[str] = None,
    ) -> None:
        """Prepares the setup for fetching updates: delete or set the webhook and drop pending
        updates if appropriate. If there are unsuccessful attempts, this will retry as specified by
        :paramref:`max_retries`.
        """
        retries = 0

        async def bootstrap_del_webhook() -> bool:
            _LOGGER.debug("Deleting webhook")
            if drop_pending_updates:
                _LOGGER.debug("Dropping pending updates from Telegram server")
            await self.bot.delete_webhook(drop_pending_updates=drop_pending_updates)
            return False

        async def bootstrap_set_webhook() -> bool:
            _LOGGER.debug("Setting webhook")
            if drop_pending_updates:
                _LOGGER.debug("Dropping pending updates from Telegram server")
            await self.bot.set_webhook(
                url=webhook_url,  # type: ignore[arg-type]
                certificate=cert,
                allowed_updates=allowed_updates,
                ip_address=ip_address,
                drop_pending_updates=drop_pending_updates,
                max_connections=max_connections,
                secret_token=secret_token,
            )
            return False

        def bootstrap_on_err_cb(exc: Exception) -> None:
            # We need this since retries is an immutable object otherwise and the changes
            # wouldn't propagate outside of thi function
            nonlocal retries

            if not isinstance(exc, InvalidToken) and (max_retries < 0 or retries < max_retries):
                retries += 1
                _LOGGER.warning(
                    "Failed bootstrap phase; try=%s max_retries=%s", retries, max_retries
                )
            else:
                _LOGGER.error("Failed bootstrap phase after %s retries (%s)", retries, exc)
                raise exc

        # Dropping pending updates from TG can be efficiently done with the drop_pending_updates
        # parameter of delete/start_webhook, even in the case of polling. Also, we want to make
        # sure that no webhook is configured in case of polling, so we just always call
        # delete_webhook for polling
        if drop_pending_updates or not webhook_url:
            await self._network_loop_retry(
                bootstrap_del_webhook,
                bootstrap_on_err_cb,
                "bootstrap del webhook",
                bootstrap_interval,
                stop_event=None,
            )

            # Reset the retries counter for the next _network_loop_retry call
            retries = 0

        # Restore/set webhook settings, if needed. Again, we don't know ahead if a webhook is set,
        # so we set it anyhow.
        if webhook_url:
            await self._network_loop_retry(
                bootstrap_set_webhook,
                bootstrap_on_err_cb,
                "bootstrap set webhook",
                bootstrap_interval,
                stop_event=None,
            )

    async def stop(self) -> None:
        """Stops the polling/webhook.

        .. seealso::
            :meth:`start_polling`, :meth:`start_webhook`

        Raises:
            :exc:`RuntimeError`: If the updater is not running.
        """
        async with self.__lock:
            if not self.running:
                raise RuntimeError("This Updater is not running!")

            _LOGGER.debug("Stopping Updater")

            self._running = False

            await self._stop_httpd()
            await self._stop_polling()

            _LOGGER.debug("Updater.stop() is complete")

    async def _stop_httpd(self) -> None:
        """Stops the Webhook server by calling ``WebhookServer.shutdown()``"""
        if self._httpd:
            _LOGGER.debug("Waiting for current webhook connection to be closed.")
            await self._httpd.shutdown()
            self._httpd = None

    async def _stop_polling(self) -> None:
        """Stops the polling task by awaiting it."""
        if self.__polling_task:
            _LOGGER.debug("Waiting background polling task to finish up.")
            self.__polling_task_stop_event.set()

            with contextlib.suppress(asyncio.CancelledError):
                await self.__polling_task
                # It only fails in rare edge-cases, e.g. when `stop()` is called directly
                # after start_polling(), but lets better be safe than sorry ...

            self.__polling_task = None
            self.__polling_task_stop_event.clear()

            if self.__polling_cleanup_cb:
                await self.__polling_cleanup_cb()
                self.__polling_cleanup_cb = None
            else:
                _LOGGER.warning(
                    "No polling cleanup callback defined. The last fetched updates may be "
                    "fetched again on the next polling start."
                )