krygal/samsung_multiroom

View on GitHub
samsung_multiroom/api/api.py

Summary

Maintainability
F
4 days
Test Coverage
A
92%
"""Low level api to communicate with samsung multiroom speaker."""
import inspect
import logging
import urllib.parse

import requests

from .api_response import ApiResponse
from .api_stream import ApiStream

METHOD_GET = 'get'

COMMAND_UIC = 'UIC'
COMMAND_CPM = 'CPM'

_LOGGER = logging.getLogger(__name__)


class SamsungMultiroomApiException(Exception):
    """Generic API exception."""


class SamsungMultiroomApi:
    """
    Samsung Multiroom Api implementation.

    Contains non-inclusive list of API calls you can make to control the speaker.
    """

    def __init__(self, user, ip_address, port=55001, timeout=5):
        """
        Initialise endpoint.

        :param user: User identifier to pass along with request
        :param ip_address: IP address of the speaker to connect to
        :param port: Port to use, defaults to 55001
        :param timeout: Timeout in seconds
        """
        self._user = user
        self._ip_address = ip_address
        self._port = port
        self._endpoint = 'http://{0}:{1}'.format(ip_address, port)
        self._timeout = timeout

    @property
    def ip_address(self):
        """
        :returns: IP address of the host
        """
        return self._ip_address

    @property
    def port(self):
        """
        :returns: Port to use
        """
        return self._port

    def request(self, method, command, payload):
        """
        Makes a request to a configured endpoint.

        :param method: HTTP method to use
        :param command: UIC|CPM
        :param payload: XML string with payload
        :returns: Response dict
        :raises: ValueError
        :raises: SamsungMultiroomApiException
        """
        if method not in [METHOD_GET]:
            raise ValueError('Invalid method {0}, must be one of METHOD_* constants'.format(method))

        if command not in [COMMAND_UIC, COMMAND_CPM]:
            raise ValueError('Invalid command {0}, must be one of COMMAND_* constants'.format(method))

        url = '{0}/{1}?cmd={2}'.format(self._endpoint, command, urllib.parse.quote(payload))
        headers = {
            'mobileUUID': self._user,
            'mobileName': 'Wireless Audio',
            'mobileVersion': '1.0',
        }

        try:
            _LOGGER.debug('Request %s. Raw payload %s', url, payload)
            response = requests.get(url, headers=headers, timeout=self._timeout)

            return self._parse_response_text(response.text)
        except requests.exceptions.RequestException as request_exception:
            _LOGGER.error('Request %s failed', url, exc_info=1)
            raise SamsungMultiroomApiException('Request {0} failed'.format(url)) from request_exception

    def _parse_response_text(self, response_text):
        _LOGGER.debug('Response %s', response_text)

        response = ApiResponse(response_text)

        if not response.success:
            raise SamsungMultiroomApiException('Received invalid response {0}'.format(response.raw))

        return response.data

    def get(self, command, action, params=None):
        """
        Generic GET request.

        :param command: COMMAND_* constant
        :param action: Get action name to execute e.g. GetVolume
        :param params: List of tuples (name, value, (optional) type hint str/dec/cdata)
        :returns dict
        """
        return self.request(METHOD_GET, command, format_payload(action, params))

    def get_speaker_name(self):
        """
        :returns: Speaker name
        """
        return self.get(COMMAND_UIC, 'GetSpkName')['spkname']

    def set_speaker_name(self, name):
        """
        Set speaker name.

        :param name: new speaker name
        """
        self.get(COMMAND_UIC, 'SetSpkName', [('spkname', name, 'cdata')])

    def get_main_info(self):
        """
        Get main information about speaker.

        :returns: Dict:
            - party - off
            - partymain - None
            - grouptype - one of M, S, N (Master, Slave, None?)
            - groupmainip - if speaker is in group, this is IP address of the master speaker
            - groupmainmacaddr - if speaker is in group, this is MAC address of the master speaker
            - spkmacaddr - this speaker's MAC address
            - spkmodelname - this speaker's model
            - groupmode - aasync, none
            - channeltype - front, invalid
            - channelvolume - 0
            - multichinfo - on
            - groupspknum - total number of speakers in the group
            - dfsstatus - dfsoff
            - protocolver - 2.3
            - btmacaddr - bluetooth MAC address
        """
        path = '/{0}?cmd={1}'.format(COMMAND_UIC, urllib.parse.quote(format_payload('GetMainInfo')))

        stream = ApiStream(self._user, self._ip_address, self._port, self._timeout)

        # Speaker sends two http responses for this request, latter one contains correct payload. We attempt
        # to fetch both responses and read/parse both.
        for response in stream.open(path):
            if not response.success:
                stream.close()
                _LOGGER.error('Request http://%s:%s%s failed', self._ip_address, self._port, path, exc_info=1)
                raise SamsungMultiroomApiException('Request http://{0}:{1}{2} failed'.format(
                    self._ip_address, self._port, path))

            if response.name == 'MainInfo':
                stream.close()
                return response.data

        _LOGGER.error('Request http://%s:%s%s failed', self._ip_address, self._port, path, exc_info=1)
        raise SamsungMultiroomApiException('Request http://{0}:{1}{2} failed'.format(
            self._ip_address, self._port, path))

    def get_volume(self):
        """
        Get current volume level between 0 and 100.

        :returns: int - volume level
        """
        return int(self.get(COMMAND_UIC, 'GetVolume')['volume'])

    def set_volume(self, volume):
        """
        Set speaker volume level.

        :param volume: Volume level between 0 and 100
        """
        self.get(COMMAND_UIC, 'SetVolume', [('volume', int(volume))])

    def get_mute(self):
        """
        Get mute state of the speaker.

        :returns: boolean True if muted
        """
        return on_off_bool(self.get(COMMAND_UIC, 'GetMute')['mute'])

    def set_mute(self, mute):
        """
        Mute/unmute the speaker.

        :param mute: boolean True to mute
        """
        self.get(COMMAND_UIC, 'SetMute', [('mute', bool_on_off(mute))])

    def get_func(self):
        """
        Retrieve current source for the speaker.

        :returns: Dict with
            - function - aux|bt|hdmi|optical|soundshare|wifi
            - submode - dlna|cp|?
        """
        return self.get(COMMAND_UIC, 'GetFunc')

    def set_func(self, function):
        """
        Set the source for the speaker

        :param function: aux|bt|hdmi|optical|soundshare|wifi
        """
        self.get(COMMAND_UIC, 'SetFunc', [('function', function)])

    def get_shuffle_mode(self):
        """
        Retrieve currently set shuffle mode.

        :returns: Boolean True if shuffle mode is enabled
        """
        return on_off_bool(self.get(COMMAND_UIC, 'GetShuffleMode')['shuffle'])

    def set_shuffle_mode(self, shuffle_mode):
        """
        Enable/disable shuffle mode of the playlist.

        :param shuffle_mode: boolean
        """
        self.get(COMMAND_UIC, 'SetShuffleMode', [('shufflemode', bool_on_off(shuffle_mode))])

    def set_trick_mode(self, trick_mode):
        """
        Move to next/previous track on the playlist.

        :param trick_mode: previous|next
        """
        if trick_mode not in ['previous', 'next']:
            raise ValueError('Trick mode must one of: previous, next')

        self.get(COMMAND_UIC, 'SetTrickMode', [('trickmode', trick_mode)])

    def set_playback_control(self, playback_control):
        """
        Pause/resume current playlist.

        :param playback_control: resume|pause|play
        """
        if playback_control not in ['resume', 'pause']:
            raise ValueError('Playback control must be one of: resume, pause')

        self.get(COMMAND_UIC, 'SetPlaybackControl', [('playbackcontrol', playback_control)])

    def get_music_info(self):
        """
        Get detailed information about current track on the playlist.

        :returns: Dict with
            - device_udn - source device unique identifier
            - playertype - allshare
            - playbacktype - folder|playlist
            - sourcename
            - parentid
            - parentid2
            - objectid
            - title
            - artist
            - album
            - thumbnail
            - timelength - HH:MM:SS.uuu format
            - playtime - in microseconds
            - seek - enable|?
            - pause - enable|?
        """
        return self.get(COMMAND_UIC, 'GetMusicInfo')

    def get_play_status(self):
        """
        Get information about play/pause status for the playlist.

        :returns: Dict with
            - function - wifi|?
            - submode - dlna|cp
            - playstatus - (optional) play|pause
        """
        return self.get(COMMAND_UIC, 'GetPlayStatus')

    def set_search_time(self, play_time):
        """
        Set current track to play from a specific time.

        API call will fail unless track is currently played.

        Setting play time further than length of the track ends the current track and plays next from the beginning.

        :play_time: play time in seconds
        """
        self.get(COMMAND_UIC, 'SetSearchTime', [('playtime', int(play_time))])

    def get_preset_list(self, start_index, list_count):
        """
        Get list of predefined radios.

        :param start_index: Starting position to retrieve
        :param list_count: Total number of items to retrieve
        :returns: List of dicts
            - title
            - description
            - thumbnail
            - contentid
            - mediaid
        """
        response = self.get(COMMAND_CPM, 'GetPresetList', [
            ('startindex', int(start_index)),
            ('listcount', int(list_count)),
        ])

        if not int(response['listcount']):
            return []

        return response_list(response['presetlist']['preset'])

    def get_radio_info(self):
        """
        Retrieves currently selected radio info and play status.

        :returns: Dict
            - cpname - TuneIn|Unknown|?
            - playstatus - stop|play
            Optionally:
            - root - preset category
            - presetindex -  position on the preset list
            - title - name of the radio
            - description - description of the radio, can include the currently played song
            - thumbnail - URL of the radio thumbnail image
            - mediaid
            - allowfeedback
            - timestamp - in ISO 8601 format
            - noqueue
        """
        return self.get(COMMAND_CPM, 'GetRadioInfo')

    def set_play_preset(self, preset_type, preset_index):
        """
        Select radio of a particular index.

        Combine this with set_select_radio to play selected radio.

        :param preset_type: 1 - speaker, 0 - my
        :param preset_index: Index of get preset list
        """
        self.get(COMMAND_CPM, 'SetPlayPreset', [
            ('presettype', int(preset_type)),
            ('presetindex', int(preset_index)),
        ])

    def set_select_radio(self):
        """
        Play selected preset preset.

        Precede this with set_play_preset.
        """
        self.get(COMMAND_CPM, 'SetSelectRadio')

    def get_dms_list(self, start_index, list_count):
        """
        Retrieve list of DLNA compatible devices to use as a media source.

        :param start_index:
        :param list_count:
        :returns: List of DLNA devices dicts
            - @device_id - likely a sequential id
            - dmsid - device udn e.g. uuid:00113249-398f-0011-8f39-8f3949321100
            - dmsname - device name e.g. nas
            - devicetype - e.g. network
            - thumbnail_PNG_LRG - thumbnail url
            - thumbnail_JPG_LRG - thumbnail url
            - thumbnail_PNG_SM - thumbnail url
            - thumbnail_JPG_SM - thumbnail url
        """
        response = self.get(COMMAND_UIC, 'GetDmsList', [
            ('liststartindex', int(start_index)),
            ('listcount', int(list_count)),
        ])

        if not int(response['listcount']):
            return []

        return response_list(response['dmslist']['dms'])

    def pc_get_music_list_by_category(self, device_udn, start_index, list_count):
        """
        Browse containers at the root of the DLNA device.

        :param device_udn: dmsid returned by get_dms_list()
        :param start_index:
        :param list_count:
        :returns: List of category dicts
            - @object_id - use it to browse into this container
            - type - usually CONTAINER
            - playindex - -1 for CONTAINERs
            - name - folder name
            - title - None
            - artist - None
            - album - None
            - thumbnail - None
            - timelength - None
            - device_udn -
        """
        response = self.get(COMMAND_UIC, 'PCGetMusicListByCategory', [
            ('device_udn', device_udn),
            ('filter', 'folder'),
            ('categoryid', 'folder'),
            ('liststartindex', int(start_index)),
            ('listcount', int(list_count)),
        ])

        if not int(response['listcount']):
            return []

        return response_list(response['musiclist']['music'])

    def pc_get_music_list_by_id(self, device_udn, parent_id, start_index, list_count):
        """
        Browse containers/audio items in the container of the DLNA device.

        :param device_udn: dmsid returned by get_dms_list()
        :param parent_id: object_id as returned by pc_get_music_list_by_category or this method
        :param start_index:
        :param list_count:
        :returns: List of containers/audio items dicts
            - @object_id - e.g. 22$@52941
            - type - CONTAINER|AUDIO
            - playindex - -1 for CONTAINER, sequential for AUDIO
            - name - folder/file name
            - title
            - artist
            - album
            - thumbnail - URL
            - timelength - HH:MM:SS.xxx format
            - device_udn -
        """
        response = self.get(COMMAND_UIC, 'PCGetMusicListByID', [
            ('device_udn', device_udn),
            ('filter', 'folder'),
            ('parentid', str(parent_id)),
            ('liststartindex', int(start_index)),
            ('listcount', int(list_count)),
        ])

        if not int(response['listcount']):
            return []

        return response_list(response['musiclist']['music'])

    def set_playlist_playback_control(self, items):
        """
        Create a playlist and playback.

        Use pc_get_music_list_by_id() to fetch item information required for playlist item.

        :param items: List of dicts:
            - device_udn
            - object_id
            - title - song title
            - artist - song artist
            - thumbnail - URL
        """
        params = [
            ('playbackcontrol', 'play'),
            ('playertype', 'allshare'),
            ('sourcename', '', 'cdata'),
            ('playindex', 0),
            ('playtime', 0),
            ('totalobjectcount', len(items)),
        ]

        for item in items:
            if 'title' not in item:
                item['title'] = 'Unknown'
            if 'artist' not in item:
                item['artist'] = 'Unknown'
            if 'thumbnail' not in item:
                item['thumbnail'] = ''

            params.append(('device_udn', item['device_udn']))
            params.append(('objectid', item['object_id']))
            params.append(('songtitle', item['title'], 'cdata'))
            params.append(('thumbnail', item['thumbnail'], 'cdata'))
            params.append(('artist', item['artist'], 'cdata'))

        self.get(COMMAND_UIC, 'SetPlaylistPlaybackControl', params)

    def browse_main(self, start_index, list_count):
        """
        Browse radios from the root.

        :param start_index:
        :param list_count:
        :returns: radio list item dict:
            folder:
            - @type - 0 - folder, 2 - radio
            - title - name of the folder
            - contentid - pass to get_select_radio_list() to browse into this folder
            radio:
            - @type - 0 - folder, 2 - radio
            - title - name of the radio
            - description - radio description
            - mediaid
            - thumbnail - URL
            - contentid
        """
        params = [
            ('startindex', int(start_index)),
            ('listcount', int(list_count)),
        ]

        response = self.get(COMMAND_CPM, 'BrowseMain', params)

        if not int(response['listcount']):
            return []

        return response_list(response['menulist']['menuitem'])

    def get_select_radio_list(self, content_id, start_index, list_count):
        """
        Browse specific radio folder.

        Note: you can't browse arbitrary folder. In order to browse a folder you had to navigate to it in a previous
        browse_main() or get_select_radio_list() call. This limitation is imposed by the speaker.

        :param content_id: contentid as returned from browse_main() or get_select_radio_list()
        :param start_index:
        :param list_count:
        :returns: see browse_main()
        """
        params = [
            ('contentid', int(content_id)),
            ('startindex', int(start_index)),
            ('listcount', int(list_count)),
        ]

        response = self.get(COMMAND_CPM, 'GetSelectRadioList', params)

        if not int(response['listcount']):
            return []

        return response_list(response['menulist']['menuitem'])

    def get_current_radio_list(self, start_index, list_count):
        """
        Browse previously browsed radio folder.

        :param start_index:
        :param list_count:
        :returns: see browse_main()
        """
        params = [
            ('startindex', int(start_index)),
            ('listcount', int(list_count)),
        ]

        response = self.get(COMMAND_CPM, 'GetCurrentRadioList', params)

        if not int(response['listcount']):
            return []

        return response_list(response['menulist']['menuitem'])

    def get_upper_radio_list(self, start_index, list_count):
        """
        Browse parent of a browsed radio folder.

        :param start_index:
        :param list_count:
        :returns: see browse_main()
        """
        params = [
            ('startindex', int(start_index)),
            ('listcount', int(list_count)),
        ]

        response = self.get(COMMAND_CPM, 'GetUpperRadioList', params)

        if not int(response['listcount']):
            return []

        return response_list(response['menulist']['menuitem'])

    def set_play_select(self, content_ids):
        """
        Plays selected radio or app item.

        :param content_ids: Content id as returned by get_upper_radio_list(), get_select_radio_list() or
            get_current_radio_list(), or list of content ids
        """
        if not isinstance(content_ids, list):
            content_ids = [content_ids]

        if len(content_ids) > 1:
            params = [('selectitemids', [int(id) for id in content_ids])]
        elif content_ids:
            params = [('selectitemid', int(content_ids[0]))]
        else:
            params = [('selectitemid', '')]

        self.get(COMMAND_CPM, 'SetPlaySelect', params)

    def get_station_data(self, content_id):
        """
        Get radio station data.

        :param content_id: Content id as returned by get_upper_radio_list(), get_select_radio_list() or
            get_current_radio_list()
        :returns: Station data dict:
            - cpname - likely TuneIn
            - title
            - description
            - thumbnail
            - stationurl
            - browsemode
            - timestamp
        """
        params = [('selectitemid', int(content_id))]

        return self.get(COMMAND_CPM, 'GetStationData', params)

    def get_7band_eq_list(self):
        """
        Retrieve equalizer presets.

        :returns: List of preset dicts with following attributes:
            - @index
            - presetindex
            - presetname
        """
        response = self.get(COMMAND_UIC, 'Get7BandEQList')

        return response_list(response['presetlist']['preset'])

    def get_current_eq_mode(self):
        """
        Retrieve current equalizer settings.

        :returns: Preset dict with following attributes:
            - presetindex
            - presetname
            - eqvalue1
            - eqvalue2
            - eqvalue3
            - eqvalue4
            - eqvalue5
            - eqvalue6
            - eqvalue7
        """
        response = self.get(COMMAND_UIC, 'GetCurrentEQMode')

        return response

    def set_7band_eq_value(self, preset_index, values):
        """
        Set preset's equalizer settings.

        Note, this doesn't overwrite preset settings, this method only sets those values temporarily. To overwrite use
        reset_7band_eq_value() method.

        :param preset_index:
        :param values: List of 7 integers ranging between -6 and 6
        """
        params = [('presetindex', int(preset_index))]

        for i, value in enumerate(values):
            params.append(('eqvalue' + str(i + 1), int(value)))

        self.get(COMMAND_UIC, 'Set7bandEQValue', params)

    def set_7band_eq_mode(self, preset_index):
        """
        Switch equalizer to a predefined preset.

        :param preset_index:
        """
        params = [('presetindex', int(preset_index))]

        self.get(COMMAND_UIC, 'Set7bandEQMode', params)

    def reset_7band_eq_value(self, preset_index, values):
        """
        Overwrite preset's equalizer settings.

        :param preset_index:
        :param values: List of 7 integers ranging between -6 and 6
        """
        params = [('presetindex', int(preset_index))]

        for i, value in enumerate(values):
            params.append(('eqvalue' + str(i + 1), int(value)))

        self.get(COMMAND_UIC, 'Reset7bandEQValue', params)

    def del_custom_eq_mode(self, preset_index):
        """
        Delete custom preset.

        Note, you cannot delete predefined presets with indices between 0 and 3 inculsive.

        :param preset_index:
        """
        params = [('presetindex', int(preset_index))]

        self.get(COMMAND_UIC, 'DelCustomEQMode', params)

    def add_custom_eq_mode(self, preset_index, preset_name):
        """
        Creates a new custom preset, using currently set equilizer values.

        Use set_7band_eq_value() or set_7band_eq_mode() to set equilizer values.

        It also allows to overwrite existing custom preset with current equilizer values.

        :param presetindex:
        :param presetname:
        """
        params = [('presetindex', int(preset_index)), ('presetname', preset_name)]

        self.get(COMMAND_UIC, 'AddCustomEQMode', params)

    def set_speaker_time(self, datetime):
        """
        Set speaker's internal time

        :param datetime: Datetime object e.g. datetime.datetime.now()
        """
        params = [
            ('year', datetime.year),
            ('month', datetime.month),
            ('day', datetime.day),
            ('hour', datetime.hour),
            ('min', datetime.minute),
            ('sec', datetime.second),
        ]

        self.get(COMMAND_UIC, 'SetSpeakerTime', params)

    def get_sleep_timer(self):
        """
        Get sleep timer settings
        :returns: Timer settings dict with following attributes:
            - sleepoption - off|start
            - sleeptime - remaining time in seconds
        """
        return self.get(COMMAND_UIC, 'GetSleepTimer')

    def set_sleep_timer(self, option, time):
        """
        Put speaker into sleep mode after specific time
        :param option: off|start
        :param time: delay in seconds
        """
        params = [
            ('option', option),
            ('sleeptime', int(time)),
        ]

        self.get(COMMAND_UIC, 'SetSleepTimer', params)

    def get_alarm_info(self):
        """
        Get list of set alarms.

        :returns: List of dicts:
            - @index - alarm id
            - hour - hour part of alarm
            - min - minutes part of alarm
            - week - hex of days flags Sun Mon Tue Wed Thu Fri Sat, e.g. for weekdays 00111110 - 0x3E
            - volume - volume of alarm
            - title - radio station title
            - description - radio station description
            - thumbnail - radio station thumbnail
            - stationurl - radio station URL
            - set - on|off whether alarm is active or not
            - soundenable - on|off whether predefined sound is used or not
            - sound - on|off whether predefined sound is used or not
            - alarmsoundname - name of predefined alarm sound names as returned by get_alarm_sound_list()
            - duration - duration of alarm in seconds
        """
        response = self.get(COMMAND_UIC, 'GetAlarmInfo')

        return response_list(response['alarmList']['alarm'])

    def set_alarm_on_off(self, index, alarm):
        """
        Enable/disable alarm.

        :param index: Alarm index
        :param alarm: on|off
        """
        params = [
            ('index', int(index)),
            ('alarm', alarm),
        ]

        self.get(COMMAND_UIC, 'SetAlarmOnOff', params)

    def get_alarm_sound_list(self):
        """
        Get list of predefined alarm sounds to use for alarm.

        :returns: List of dicts:
            - @index - alarm sound index
            - alarsoundindex - (note misspelling) alarm sound index
            - alarmsoundname - alarm sound name
        """
        response = self.get(COMMAND_UIC, 'GetAlarmSoundList')

        return response_list(response['alarmlist']['alarmsound'])

    def set_alarm_info(self, index, hour, minute, week, duration, volume, station_data):
        """
        Create alarm.

        Note, you can only create 3 alarms with indices 0, 1, and 2.

        :param index: Alarm index
        :param hour: Alarm hour
        :param minute: Alarm minute
        :param week: hex of days flags Sun Mon Tue Wed Thu Fri Sat, e.g. for weekdays 00111110 - 0x3E
        :param duration: Alarm duration in seconds
        :param volume: Alarm volume 0-100
        :param station_data: Dict as returned by get_station_data
            - title
            - description
            - thumbnail
            - stationurl
        """
        params = [
            ('index', int(index)),
            ('hour', int(hour)),
            ('min', int(minute)),
            ('week', hex(int(week, 16))),
            ('volume', int(volume)),
            ('title', station_data['title'], 'cdata'),
            ('description', station_data['description'], 'cdata'),
            ('thumbnail', station_data['thumbnail'], 'cdata'),
            ('stationurl', station_data['stationurl'], 'cdata'),
            ('soundenable', 'off'),
            ('sound', -1),
            ('duration', int(duration)),
        ]

        self.get(COMMAND_UIC, 'SetAlarmInfo', params)

    def del_alarm(self, index_list):
        """
        Delete alarm(s).

        Note, speaker's have a limit of 3 alarms and only the first 3 indices will be accepted for deletion while
        remaining will be ignored. This restriction is set on the speaker itself.

        :param index_list: List of alarm indices as returned by get_alarm_info()
        """
        params = [
            ('totaldelnum', len(index_list)),
        ]
        params += [('index', int(i)) for i in index_list]

        self.get(COMMAND_UIC, 'DelAlarm', params)

    def spk_in_group(self, action):
        """
        ???

        :param action: select|?
        """
        params = [('act', action)]

        return self.get(COMMAND_UIC, 'SpkInGroup', params)

    def set_multispk_group(self, name, speakers):
        """
        Group speakers.

        :param name: Group's name
        :param speakers: List of speakers (first one will be treated as main/control one). Dict:
            - name
            - ip
            - mac
        """
        params = [('name', name, 'cdata'), ('index', 1), ('type', 'main'), ('spknum', len(speakers))]

        for i, speaker in enumerate(speakers):
            if i == 0:
                params += [
                    ('audiosourcemacaddr', speaker['mac']),
                    ('audiosourcename', speaker['name'], 'cdata'),
                    ('audiosourcetype', 'speaker'),
                ]
            else:
                params += [
                    ('subspkip', speaker['ip']),
                    ('subspkmacaddr', speaker['mac']),
                ]

        self.get(COMMAND_UIC, 'SetMultispkGroup', params)

    def set_ungroup(self):
        """
        Ungroup speakers.
        """
        self.get(COMMAND_UIC, 'SetUngroup')

    def set_group_name(self, name):
        """
        Update speaker's group name.

        :param name: new name
        """
        params = [('groupname', name, 'cdata')]

        self.get(COMMAND_UIC, 'SetGroupName', params)

    def get_cp_list(self, start_index, list_count):
        """
        Get list of speakers app integrations.

        :returns: List of dicts:
            - cpid - id of app service
            - cpname - service name
            - signinstatus - 0/1
            - username - (optionally if signed in) signed in user name
            - istrial_user - (optionally) 1
        """
        params = [
            ('liststartindex', int(start_index)),
            ('listcount', int(list_count)),
        ]

        response = self.get(COMMAND_CPM, 'GetCpList', params)

        return response_list(response['cplist']['cp'])

    def set_cp_service(self, cp_id):
        """
        Switch to a specific cp service.

        It also initiates playback of that service.

        :param cp_id: Cp service id as returned by get_cp_list()
        """
        params = [('cpservice_id', int(cp_id))]

        self.get(COMMAND_CPM, 'SetCpService', params)

    def get_cp_info(self):
        """
        Get info about currently active cp service.

        :returns: Dict
            - cpname
            - timestamp - ISO format
            - category
            - signinstatus - 0/1
            - username
            - subscription_info
            - audioinfo - Dict
                - title
                - streamtype - station|?
                - thumbnail - thumbnail url
                - playstatus - play|pause
            },
        """
        return self.get(COMMAND_CPM, 'GetCpInfo')

    def set_sign_in(self, username, password):
        """
        Authenticate with the currently active service.

        :param username: Service username
        :param password: Service password
        """
        params = [
            ('username', username),
            ('password', password),
        ]

        self.get(COMMAND_CPM, 'SetSignIn', params)

    def set_sign_out(self):
        """
        Sign out from the currently active service.

        You need to be authenticated for this call to be successful.
        """
        self.get(COMMAND_CPM, 'SetSignOut')

    def get_cp_submenu(self):
        """
        Get list of top level service categories.

        :returns: List of dicts:
            - @id - id of the menu item
            - submenuitem_localized - menu item name
        """
        response = self.get(COMMAND_CPM, 'GetCpSubmenu')

        return response_list(response['submenu']['submenuitem'])

    def set_select_cp_submenu(self, content_id, start_index, list_count):
        """
        Get list of sub categories.

        Note, some items are autoplayable. By calling this method you might initiate playback.

        :param content_id: parent id as returned by get_cp_submenu() or this method
        :returns: List of dicts
            - @type
            - title
            - contentid
        """
        params = [
            ('contentid', int(content_id)),
            ('startindex', int(start_index)),
            ('listcount', int(list_count)),
        ]

        response = self.get(COMMAND_CPM, 'SetSelectCpSubmenu', params)

        return response_list(response['menulist']['menuitem'])

    def get_cp_player_playlist(self, start_index, list_count):
        """
        Get currently active service playlist.

        Note, some services have limit of items returned regardless of list_count passed.

        :param start_index:
        :param list_count:
        :returns: List of dicts
            - @type - 1
            - @available - 0/1
            - @currentplaying - (optional) 1 if present
            - artist
            - album
            - mediaid - unique
            - tracklength - track length in seconds
            - title
            - contentid
            - thumbnail - thumbnail url

        """
        params = [
            ('startindex', int(start_index)),
            ('listcount', int(list_count)),
        ]

        response = self.get(COMMAND_CPM, 'GetCpPlayerPlaylist', params)

        return response_list(response['menulist']['menuitem'])

    def set_skip_current_track(self):
        """
        Skip current track and play next item on the playlist.
        """
        self.get(COMMAND_CPM, 'SetSkipCurrentTrack')

    def get_current_play_time(self):
        """
        Get info about current track playback position and length.

        :returns: Dict
            - tracklength - track length in seconds
            - playtime - playback position in seconds
        """
        return self.get(COMMAND_UIC, 'GetCurrentPlayTime')

    def set_play_cp_playlist_track(self, item_id):
        """
        Advance playback to specific track on the playlist.

        :param item_id: Item id as returned by get_cp_player_playlist()
        """
        params = [('selectitemid', int(item_id))]

        return self.get(COMMAND_CPM, 'SetPlayCpPlaylistTrack', params)

    def get_repeat_mode(self):
        """
        Get playback repeat mode.

        :returns: string one|all|off for repeat one track, all tracks on the playlist, or disabled repeat mode
        """
        response = self.get(COMMAND_UIC, 'GetRepeatMode')

        return response['repeat']

    def set_repeat_mode(self, mode):
        """
        Set playback repeat mode.

        :param mode: string one|all|off for repeat one track, all tracks on the playlist, or disabled repeat mode
        """
        params = [('repeatmode', mode)]

        self.get(COMMAND_UIC, 'SetRepeatMode', params)


def on_off_bool(value):
    """Convert on/off to True/False correspondingly."""
    return value == 'on'


def bool_on_off(value):
    """Convert True/False to on/off correspondingly."""
    return 'on' if value else 'off'


def response_list(input_list):
    """xmltodict returns different structure if there's one item on the list."""
    if isinstance(input_list, dict):
        input_list = [input_list]

    return input_list


def format_action(action):
    """Format request action."""
    return '<name>{0}</name>'.format(action)


def format_param(param):
    """
    Format request parameter.

    :param param: Tuple e.g. ('list_count', 30, 'dec')
        - name - name of the param
        - value - mixed value
        - type_hint - (optionally) str|dec|cdata|dec_arr
    """
    (name, value, *attributes) = param

    if not attributes:
        type_hint = 'str'
        if isinstance(value, int):
            type_hint = 'dec'
        if isinstance(value, list) and value and isinstance(value[0], int):
            type_hint = 'dec_arr'
    else:
        type_hint = attributes[0]

    if type_hint == 'cdata':
        return '<p type="{0}" name="{1}" val="empty"><![CDATA[{2}]]></p>'.format(type_hint, name, value)
    if type_hint == 'dec_arr':
        value = ''.join(['<item>{0}</item>'.format(v) for v in value])
        return '<p type="{0}" name="{1}" val="empty">{2}</p>'.format(type_hint, name, value)

    return '<p type="{0}" name="{1}" val="{2}"/>'.format(type_hint, name, value)


def format_payload(action, params=None):
    """Format full request payload."""
    payload = format_action(action)
    if params:
        for param in params:
            payload += format_param(param)

    return payload


def paginator(*args):
    """
    Generator to paginate over api call.

    Api method must accept start_index and list_count parameters.

    :param: callable function to use for pagination
    :param: optionally pass second function that will be used for subsequent pages
    :param: pass all initial values that first callable function accepts, they will be replicated to a second callable
    :returns: Iterable
    """
    if not callable(args[0]):
        raise ValueError('First argument must be a function')

    primary = args[0]
    secondary = args[0]
    args = args[1:]

    if callable(args[0]):
        secondary = args[0]
        args = args[1:]

    primary_parameters = _get_callable_parameters(primary)
    secondary_parameters = _get_callable_parameters(secondary)

    # match primary_parameters with args
    primary_kwargs = {}
    secondary_kwargs = {}

    for i, parameter in enumerate(primary_parameters):
        primary_kwargs[parameter] = args[i]
        if parameter in secondary_parameters:
            secondary_kwargs[parameter] = args[i]

    current = primary
    current_kwargs = primary_kwargs
    has_more = True

    while has_more:
        items = current(**current_kwargs)
        for item in items:
            yield item

        has_more = len(items) >= current_kwargs['list_count']

        current = secondary
        current_kwargs = secondary_kwargs
        current_kwargs['start_index'] = current_kwargs['start_index'] + current_kwargs['list_count']


def _get_callable_parameters(arg_callable):
    return inspect.signature(arg_callable).parameters.keys()