AngellusMortis/sxm-client

View on GitHub
sxm/client.py

Summary

Maintainability
D
1 day
Test Coverage
F
40%
import base64
import datetime
import json
import logging
import re
import time
import traceback
import urllib.parse
from typing import Any, Callable, Dict, List, Optional, Union

import httpx
from fake_useragent import UserAgent  # type: ignore
from make_it_sync import make_sync  # type: ignore
from tenacity import retry, stop_after_attempt, wait_fixed
from ua_parser import user_agent_parser  # type: ignore

from sxm.models import QualitySize, RegionChoice, XMChannel, XMLiveChannel

__all__ = [
    "HLS_AES_KEY",
    "SXMClient",
    "SXMClientAsync",
    "AuthenticationError",
    "SegmentRetrievalException",
]


SXM_APP_VERSION = "5.36.514"
SXM_DEVICE_MODEL = "EverestWebClient"
HLS_AES_KEY = base64.b64decode("0Nsco7MAgxowGvkUT8aYag==")
FALLBACK_UA = (
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0"
)
REST_V2_FORMAT = "https://player.siriusxm.com/rest/v2/experience/modules/{}"
REST_V4_FORMAT = "https://player.siriusxm.com/rest/v4/experience/modules/{}"
SESSION_MAX_LIFE = 14400

ENABLE_NEW_CHANNELS = True


class SXMError(Exception):
    """Base class for all other SXM Errors"""


class ConfigurationError(SXMError):
    """SXM Configuration retrive failed, renew session, and try again later"""


class AuthenticationError(SXMError):
    """SXM Authentication failed, renew session"""

    pass


class SegmentRetrievalException(SXMError):
    """failed to get HLS segment, renew session"""

    pass


class SXMClientAsync:
    """Class to interface with SXM api and access HLS
    live streams of audio

    Parameters
    ----------
    username : :class:`str`
        SXM username
    password : :class:`str`
        SXM password
    region : :class:`str` ("US" or "CA")
        Sets your SXM account region
    user_agent : Optional[:class:`str`]
        User Agent string to use for making requests to SXM. If `None` is
        passed, it will attempt to generate one based on real browser usage
        data. Defaults to `None`.
    update_handler : Optional[Callable[[:class:`dict`], `None`]]
        Callback to be called whenever a playlist updates and new
        Live Channel data is retrieved. Defaults to `None`.

    Attributes
    ----------
    is_logged_in : :class:`bool`
        Returns if account is logged into SXM's servers
    is_session_authenticated : :class:`bool`
        Returns if session is valid and ready to use
    sxmak_token : :class:`str`
        Needs documentation
    gup_id : :class:`str`
        Needs documentation
    channels : List[:class:`XMChannel`]
        Retrieves and returns a full list of all :class:`XMChannel`
        available to the logged in account
    favorite_channels : List[:class:`XMChannel`]
        Retrieves and returns a full list of all :class:`XMChannel`
        available to the logged in account that are marked
        as favorite
    """

    last_renew: Optional[float]
    password: str
    region: RegionChoice
    update_handler: Optional[Callable[[dict], None]]
    update_interval: int
    username: str
    stream_quality: QualitySize

    _channels: Optional[List[XMChannel]]
    _favorite_channels: Optional[List[XMChannel]]
    _playlists: Dict[str, str]
    _use_primary: bool
    _ua: Dict[str, Any]
    _session: httpx.AsyncClient
    _configuration: Optional[Dict] = None
    _urls: Optional[Dict[str, str]] = None

    def __init__(
        self,
        username: str,
        password: str,
        region: RegionChoice = RegionChoice.US,
        quality: QualitySize = QualitySize.LARGE_256k,
        user_agent: Optional[str] = None,
        update_handler: Optional[Callable[[dict], None]] = None,
    ):

        self._log = logging.getLogger(__file__)

        if user_agent is None:
            try:
                ua = UserAgent(use_cache_server=False)
                ua.update()
                user_agent = ua.chrome
            except Exception:
                user_agent = FALLBACK_UA
        self._ua = user_agent_parser.Parse(user_agent)

        self.reset_session()

        self.username = username
        self.password = password
        self.region = region
        self.stream_quality = quality

        self._playlists = {}
        self._channels = None
        self._favorite_channels = None
        self._use_primary = True

        # vars to manage session cache
        self.last_renew = None
        self.update_interval = 30

        # hook function to call whenever the playlist updates
        self.update_handler = update_handler

    def __del__(self):
        make_sync(self.close_session)()

    @property
    def is_logged_in(self) -> bool:
        return "SXMAUTHNEW" in self._session.cookies

    @property
    def is_session_authenticated(self) -> bool:
        return (
            "AWSALB" in self._session.cookies and "JSESSIONID" in self._session.cookies
        )

    @property
    def sxmak_token(self) -> Union[str, None]:
        try:
            token = self._session.cookies["SXMAKTOKEN"]
            return token.split("=", 1)[1].split(",", 1)[0]
        except (KeyError, IndexError):
            return None

    @property
    def gup_id(self) -> Union[str, None]:
        try:
            data = self._session.cookies["SXMDATA"]
            return json.loads(urllib.parse.unquote(data))["gupId"]
        except (KeyError, ValueError):
            return None

    @property
    async def channels(self) -> List[XMChannel]:
        # download channel list if necessary
        if self._channels is None:
            channels = await self.get_channels()

            if len(channels) == 0:
                return []

            self._channels = []
            for channel in channels:
                self._channels.append(XMChannel.from_dict(channel))

            self._channels = sorted(self._channels, key=lambda x: int(x.channel_number))

        return self._channels

    @property
    async def favorite_channels(self) -> List[XMChannel]:

        if self._favorite_channels is None:
            self._favorite_channels = [c for c in await self.channels if c.is_favorite]
        return self._favorite_channels

    def _extract_configuration(self, data: dict):
        _config = {}
        config = data["moduleList"]["modules"][0]["moduleResponse"]["configuration"][
            "components"
        ]
        for item in config:
            _config[item["name"]] = item
        return _config

    @property
    async def configuration(self) -> dict:
        if self._configuration is None:
            data = await self.get_configuration()
            if data is None:
                raise ConfigurationError()
            self._configuration = self._extract_configuration(data)

        return self._configuration

    def _extract_urls(self, urls: dict):
        _urls = {}
        for url in urls["settings"][0]["relativeUrls"]:
            if "url" in url:
                _urls[url["name"]] = url["url"]

        return _urls

    @property
    async def urls(self) -> Dict[str, str]:
        if self._urls is None:
            urls = (await self.configuration)["relativeUrls"]
            self._urls = self._extract_urls(urls)
        return self._urls

    @property
    def primary(self) -> bool:
        return self._use_primary

    async def get_primary_hls_root(self) -> str:
        urls = await self.urls

        return urls["Live_Primary_HLS"]

    async def get_secondary_hls_root(self) -> str:
        urls = await self.urls

        return urls["Live_Secondary_HLS"]

    async def get_hls_root(self) -> str:
        if self._use_primary:
            return await self.get_primary_hls_root()
        return await self.get_secondary_hls_root()

    def set_primary(self, value: bool):
        self._use_primary = value
        self._playlists = {}

    async def login(self) -> bool:
        """Attempts to log into SXM with stored username/password"""

        self._log.debug(f"Logging in as {self.username}...")
        postdata = self._get_device_info()
        postdata.update(
            {
                "standardAuth": {
                    "username": self.username,
                    "password": self.password,
                }
            }
        )

        data = await self._post("modify/authentication", postdata, authenticate=False)
        if not data:
            return False

        try:
            return data["status"] == 1 and self.is_logged_in
        except KeyError:
            self._log.error("Error decoding json response for login")
            return False

    @retry(wait=wait_fixed(3), stop=stop_after_attempt(10))
    async def authenticate(self) -> bool:
        """Attempts to create a valid session for use with the client

        Raises
        ------
        AuthenticationError
            If login failed and session now needs to be reset
        """

        if not self.is_logged_in and not await self.login():
            self._log.error("Unable to authenticate because login failed")
            await self.close_session()
            self.reset_session()
            raise AuthenticationError("Reset session")

        data = await self._post(
            "resume?OAtrial=false", self._get_device_info(), authenticate=False
        )
        if not data:
            return False

        try:
            return data["status"] == 1 and self.is_session_authenticated
        except KeyError:
            self._log.error("Error parsing json response for authentication")
            self._log.error(traceback.format_exc())
            return False

    @retry(wait=wait_fixed(3), stop=stop_after_attempt(10))
    async def get_configuration(self) -> Optional[Dict[str, Any]]:
        params = {
            "result-template": "html5",
            "app-region": self.region.value,
            "cacheBuster": str(int(time.time())),
        }

        return await self._get("get/configuration", params=params)

    @retry(stop=stop_after_attempt(25), wait=wait_fixed(1))
    async def get_playlist(
        self, channel_id: str, use_cache: bool = True
    ) -> Union[str, None]:
        """Gets playlist of HLS stream URLs for given channel ID

        Parameters
        ----------
        channel_id : :class:`str`
            ID of SXM channel to retrieve playlist for
        use_cache : :class:`bool`
            Use cached playlists for force new retrival. Defaults to `True`
        """

        url = await self._get_playlist_url(channel_id, use_cache)
        if url is None:
            return None

        response = None
        try:
            response = await self._make_request("GET", url, self._token_params())

            if response.status_code == 403:
                self._log.info("Received status code 403 on playlist, renewing session")
                return await self.get_playlist(channel_id, False)

            if response.is_error:
                self._log.warn(
                    f"Received status code {response.status_code} on "
                    f"playlist variant"
                )
                response = None

        except httpx.RequestError as e:
            self._log.error(f"Error getting playlist: {e}")

        if response is None:
            return None

        # add base path to segments
        playlist_entries = []
        aac_path = re.findall("AAC_Data.*", url)[0]
        for line in response.text.split("\n"):
            line = line.strip()
            if line.endswith(".aac"):
                playlist_entries.append(re.sub(r"[^\/]\w+\.m3u8", line, aac_path))
            else:
                playlist_entries.append(line)

        return "\n".join(playlist_entries)

    @retry(wait=wait_fixed(1), stop=stop_after_attempt(5))
    async def get_segment(self, path: str) -> Union[bytes, None]:
        """Gets raw HLS segment for given path

        Parameters
        ----------
        path : :class:`str`
            SXM path

        Raises
        ------
        SegmentRetrievalException
            If segments are starting to come back forbidden and session
            needs reset
        """

        url = urllib.parse.urljoin(await self.get_hls_root(), path)
        res = await self._session.get(url, params=self._token_params())

        if res.status_code == 403:
            raise SegmentRetrievalException(
                "Received status code 403 on segment, renew session"
            )

        if res.is_error:
            self._log.warn(f"Received status code {res.status_code} on segment")
            return None

        return res.content

    async def get_channels(self) -> List[dict]:
        """Gets raw list of channel dictionaries from SXM. Each channel
        dict can be pass into the constructor of :class:`XMChannel` to turn it
        into an object"""

        channels: List[Dict[str, str]] = []

        postdata = {
            "consumeRequests": [],
            "resultTemplate": "responsive",
            "alerts": [],
            "profileInfos": [],
        }

        if ENABLE_NEW_CHANNELS:
            data = await self._post(
                "get?type=2",
                postdata,
                channel_list=True,
                url_format=REST_V4_FORMAT,
            )
        else:
            data = await self._post("get", postdata, channel_list=True)

        if not data:
            self._log.warn("Unable to get channel list")
            return channels

        try:
            channels = data["moduleList"]["modules"][0]["moduleResponse"][
                "contentData"
            ]["channelListing"]["channels"]
        except (KeyError, IndexError):
            self._log.error("Error parsing json response for channels")
            self._log.error(traceback.format_exc())
            return []
        return channels

    async def get_channel(self, name: str) -> Union[XMChannel, None]:
        """Retrieves a specific channel from `self.channels`

        Parameters
        ----------
        name : :class:`str`
            name, id, or channel number of SXM channel to get
        """

        name = name.lower()
        for x in await self.channels:
            if (
                x.name.lower() == name
                or x.id.lower() == name
                or x.channel_number == name
            ):
                return x
        return None

    async def get_now_playing(self, channel: XMChannel) -> Union[Dict[str, Any], None]:
        """Gets raw dictionary of response data for the live channel.

        `data['messages'][0]['code']`
            will have the status response code from SXM

        `data['moduleList']['modules'][0]['moduleResponse']['liveChannelData']`
            will have the raw data that can be passed into
            :class:`XMLiveChannel` constructor to create an object

        Parameters
        ----------
        channel : :class:`XMChannel`
            SXM channel to look up live channel data for
        """

        now = time.time()
        now_dt = datetime.datetime.fromtimestamp(now).replace(
            tzinfo=datetime.timezone.utc
        )

        params = {
            "assetGUID": channel.guid,
            "ccRequestType": "AUDIO_VIDEO",
            "channelId": channel.id,
            "hls_output_mode": "custom",
            "marker_mode": "all_separate_cue_points",
            "result-template": "web",
            "time": str(int(round(now * 1000.0))),
            "timestamp": now_dt.isoformat("T") + "Z",
        }

        return await self._get("tune/now-playing-live", params)

    async def close_session(self):
        if self._session is not None:
            await self._session.aclose()
            self._session = None

    def reset_session(self) -> None:
        """Resets session used by client"""

        self._session_start = time.monotonic()
        self._session = httpx.AsyncClient()
        self._session.headers.update({"User-Agent": self._ua["string"]})
        self._urls = None
        self._configuration = None

    def _token_params(self) -> Dict[str, Union[str, None]]:
        return {
            "token": self.sxmak_token,
            "consumer": "k2",
            "gupId": self.gup_id,
        }

    def _get_device_info(self) -> dict:
        """Generates a dict of device info to pass to SXM"""

        browser_version = self._ua["user_agent"]["major"]
        if self._ua["user_agent"]["minor"] is not None:
            browser_version = f'{browser_version}.{self._ua["user_agent"]["minor"]}'
        if self._ua["user_agent"]["patch"] is not None:
            browser_version = f'{browser_version}.{self._ua["user_agent"]["patch"]}'

        return {
            "resultTemplate": "web",
            "deviceInfo": {
                "osVersion": self._ua["os"]["family"],
                "platform": "Web",
                "sxmAppVersion": SXM_APP_VERSION,
                "browser": self._ua["user_agent"]["family"],
                "browserVersion": browser_version,
                "appRegion": self.region.value,
                "deviceModel": SXM_DEVICE_MODEL,
                "clientDeviceId": "null",
                "player": "html5",
                "clientDeviceType": "web",
            },
        }

    async def _make_request(
        self,
        method: str,
        path: str,
        params: Dict[str, Any],
        url_format: str = REST_V2_FORMAT,
    ) -> httpx.Response:
        if path.startswith("http"):
            url = path
        else:
            url = url_format.format(path)

        try:
            if method == "GET":
                response = await self._session.get(url, params=params)
            elif method == "POST":
                response = await self._session.post(url, json=params)
            else:
                raise httpx.RequestError("only GET and POST")
        except httpx.RequestError as e:
            self._log.error(
                f"An Exception occurred when trying to perform "
                f"the {method} request!"
            )
            self._log.error(f"Params: {params}")
            self._log.error(f"Method: {method}")
            self._log.error(f"Request: {e.request}")

            if isinstance(e, httpx.HTTPStatusError):
                self._log.error(f"Response: {e.response}")  # pylint: disable=no-member
            raise (e)

        return response

    async def _request(
        self,
        method: str,
        path: str,
        params: Dict[str, str],
        authenticate: bool = True,
        url_format: str = REST_V2_FORMAT,
    ) -> Union[Dict[str, Any], None]:
        """Makes a GET or POST request to SXM servers"""

        method = method.upper()

        if authenticate:
            now = time.monotonic()
            if (now - self._session_start) > SESSION_MAX_LIFE:
                self._log.info("Session exceed max time, reseting")
                await self.close_session()
                self.reset_session()

            if not self.is_session_authenticated and not await self.authenticate():

                self._log.error("Unable to authenticate")
                return None

        response = await self._make_request(method, path, params, url_format=url_format)

        if response.is_error:
            self._log.warn(
                f"Received status code {response.status_code} for " f"path '{path}'"
            )
            self._log.warn(f"Response: {response.text}")
            return None

        try:
            return response.json()["ModuleListResponse"]
        except (KeyError, ValueError):
            self._log.error(f"Error decoding json for path '{path}'")
            return None

    async def _get(
        self,
        path: str,
        params: Dict[str, str],
        authenticate: bool = True,
        url_format: str = REST_V2_FORMAT,
    ) -> Union[Dict[str, Any], None]:
        """Makes a GET request to SXM servers"""

        return await self._request(
            "GET", path, params, authenticate, url_format=url_format
        )

    async def _post(
        self,
        path: str,
        postdata: dict,
        channel_list: bool = False,
        authenticate: bool = True,
        url_format: str = REST_V2_FORMAT,
    ) -> Union[Dict[str, Any], None]:
        """Makes a POST request to SXM servers"""
        postdata = {"moduleList": {"modules": [{"moduleRequest": postdata}]}}

        if channel_list:
            postdata["moduleList"]["modules"][0].update(
                {
                    "moduleArea": "Discovery",
                    "moduleType": "ChannelListing",
                    "moduleRequest": {"resultTemplate": "responsive"},
                }
            )

        return await self._request(
            "POST", path, postdata, authenticate, url_format=url_format
        )

    async def _get_playlist_url(
        self,
        channel_id: str,
        use_cache: bool = True,
        max_attempts: int = 5,
    ) -> Union[str, None]:
        """Returns HLS live stream URL for a given `XMChannel`"""

        channel = await self.get_channel(channel_id)
        if channel is None:
            self._log.info(f"No channel for {channel_id}")
            return None

        now = time.monotonic()
        if use_cache and channel.id in self._playlists:
            if (
                self.last_renew is None
                or (now - self.last_renew) > self.update_interval
            ):

                del self._playlists[channel.id]
            else:
                return self._playlists[channel.id]

        data = await self.get_now_playing(channel)
        if data is None:
            return None

        # parse response
        try:
            message = data["messages"][0]["message"]
            message_code = data["messages"][0]["code"]

        except (KeyError, IndexError):
            self._log.error("Error parsing json response for playlist")
            self._log.error(traceback.format_exc())
            return None

        # login if session expired
        if message_code == 201 or message_code == 208:
            if max_attempts > 0:
                self._log.info("Session expired, logging in and authenticating")
                if await self.authenticate():
                    self._log.info("Successfully authenticated")
                    return await self._get_playlist_url(
                        channel.id, use_cache, max_attempts - 1
                    )
                else:
                    self._log.error("Failed to authenticate")
                    return None
            else:
                self._log.warn("Reached max attempts for playlist")
                return None
        elif message_code == 204:
            self._log.warn("Multiple login error received, reseting session")
            await self.close_session()
            self.reset_session()
            if await self.authenticate():
                self._log.info("Successfully authenticated")
                return await self._get_playlist_url(
                    channel.id, use_cache, max_attempts - 1
                )
            else:
                self._log.error("Failed to authenticate")
                return None
        elif message_code != 100:
            self._log.warn(f"Received error {message_code} {message}")
            return None

        live_channel_raw = data["moduleList"]["modules"][0]
        live_channel = XMLiveChannel.from_dict(live_channel_raw)
        live_channel.set_stream_quality(self.stream_quality)
        live_channel.set_hls_roots(
            await self.get_primary_hls_root(), await self.get_secondary_hls_root()
        )

        self.update_interval = int(data["moduleList"]["modules"][0]["updateFrequency"])

        # get m3u8 url
        url = live_channel.primary_hls.url
        if not self._use_primary:
            url = live_channel.secondary_hls.url

        playlist = await self._get_playlist_variant_url(url)
        if playlist is not None:
            self._playlists[channel.id] = playlist
            self.last_renew = time.monotonic()

            if self.update_handler is not None:
                self.update_handler(live_channel_raw)
            return self._playlists[channel.id]
        return None

    async def _get_playlist_variant_url(self, url: str) -> Union[str, None]:
        res = await self._session.get(url, params=self._token_params())

        if res.is_error:
            self._log.warn(
                f"Received status code {res.status_code} on playlist "
                f"variant retrieval"
            )
            return None

        for x in res.text.split("\n"):
            if x.rstrip().endswith(".m3u8"):
                # first variant should be 256k one
                return "{}/{}".format(url.rsplit("/", 1)[0], x.rstrip())

        return None


class SXMClient:
    """Sync wrapper class around SXMClientAsync

    Parameters
    ----------
    username : :class:`str`
        SXM username
    password : :class:`str`
        SXM password
    region : :class:`str` ("US" or "CA")
        Sets your SXM account region
    user_agent : Optional[:class:`str`]
        User Agent string to use for making requests to SXM. If `None` is
        passed, it will attempt to generate one based on real browser usage
        data. Defaults to `None`.
    update_handler : Optional[Callable[[:class:`dict`], `None`]]
        Callback to be called whenever a playlist updates and new
        Live Channel data is retrieved. Defaults to `None`.

    Attributes
    ----------
    async_client : :class:`SXMClientAsync`
    is_logged_in : :class:`bool`
        Returns if account is logged into SXM's servers
    is_session_authenticated : :class:`bool`
        Returns if session is valid and ready to use
    sxmak_token : :class:`str`
        Needs documentation
    gup_id : :class:`str`
        Needs documentation
    channels : List[:class:`XMChannel`]
        Retrieves and returns a full list of all :class:`XMChannel`
        available to the logged in account
    favorite_channels : List[:class:`XMChannel`]
        Retrieves and returns a full list of all :class:`XMChannel`
        available to the logged in account that are marked
        as favorite
    """

    def __init__(
        self,
        username: str,
        password: str,
        region: RegionChoice = RegionChoice.US,
        quality: QualitySize = QualitySize.LARGE_256k,
        user_agent: Optional[str] = None,
        update_handler: Optional[Callable[[dict], None]] = None,
    ):

        self.async_client = SXMClientAsync(
            username=username,
            password=password,
            region=region,
            quality=quality,
            user_agent=user_agent,
            update_handler=update_handler,
        )

    @property
    def last_renew(self) -> Optional[float]:
        return self.async_client.last_renew

    @property
    def password(self) -> str:
        return self.async_client.password

    @property
    def region(self) -> RegionChoice:
        return self.async_client.region

    @property
    def update_handler(self) -> Optional[Callable[[dict], None]]:
        return self.async_client.update_handler

    @property
    def update_interval(self) -> int:
        return self.async_client.update_interval

    @property
    def username(self) -> str:
        return self.async_client.username

    @property
    def stream_quality(self) -> QualitySize:
        return self.async_client.stream_quality

    @property
    def is_logged_in(self) -> bool:
        return self.async_client.is_logged_in

    @property
    def is_session_authenticated(self) -> bool:
        return self.async_client.is_session_authenticated

    @property
    def sxmak_token(self) -> Union[str, None]:
        return self.async_client.sxmak_token

    @property
    def gup_id(self) -> Union[str, None]:
        return self.async_client.gup_id

    @property
    def channels(self) -> List[XMChannel]:
        # download channel list if necessary
        if self.async_client._channels is None:
            channels = self.get_channels()

            if len(channels) == 0:
                return []

            self.async_client._channels = []
            for channel in channels:
                self.async_client._channels.append(XMChannel.from_dict(channel))

            self.async_client._channels = sorted(
                self.async_client._channels, key=lambda x: int(x.channel_number)
            )

        return self.async_client._channels

    @property
    def favorite_channels(self) -> List[XMChannel]:

        if self.async_client._favorite_channels is None:
            self.async_client._favorite_channels = [
                c for c in self.channels if c.is_favorite
            ]
        return self.async_client._favorite_channels

    @property
    def configuration(self) -> dict:
        if self.async_client._configuration is None:
            data = self.get_configuration()
            if data is None:
                raise ConfigurationError()

            self.async_client._configuration = self.async_client._extract_configuration(
                data
            )
        return self.async_client._configuration

    @property
    def urls(self) -> Dict[str, str]:
        if self.async_client._urls is None:
            urls = self.configuration["relativeUrls"]
            self.async_client._urls = self.async_client._extract_urls(urls)
        return self.async_client._urls

    @property
    def primary(self) -> bool:
        return self.async_client._use_primary

    def get_primary_hls_root(self) -> str:
        return make_sync(self.async_client.get_primary_hls_root)()

    def get_secondary_hls_root(self) -> str:
        return make_sync(self.async_client.get_secondary_hls_root)()

    def get_hls_root(self) -> str:
        return make_sync(self.async_client.get_hls_root)()

    def set_primary(self, value: bool):
        self.async_client.set_primary(value)

    def login(self) -> bool:
        return make_sync(self.async_client.login)()

    def authenticate(self) -> bool:
        return make_sync(self.async_client.authenticate)()

    def get_configuration(self) -> Optional[Dict[str, Any]]:
        return make_sync(self.async_client.get_configuration)()

    def get_playlist(self, channel_id: str, use_cache: bool = True) -> Union[str, None]:
        return make_sync(self.async_client.get_playlist)(
            channel_id=channel_id, use_cache=use_cache
        )

    def get_segment(self, path: str) -> Union[bytes, None]:
        return make_sync(self.async_client.get_segment)(path=path)

    def get_channels(self) -> List[dict]:
        return make_sync(self.async_client.get_channels)()

    def get_channel(self, name: str) -> Union[XMChannel, None]:
        return make_sync(self.async_client.get_channel)(name)

    def get_now_playing(self, channel: XMChannel) -> Union[Dict[str, Any], None]:
        return make_sync(self.async_client.get_now_playing)(channel)

    def close_session(self):
        return make_sync(self.async_client.close_session)()

    def reset_session(self):
        return self.async_client.reset_session()