hthiery/python-fritzhome

View on GitHub
pyfritzhome/fritzhome.py

Summary

Maintainability
C
1 day
Test Coverage
"""The main fritzhome handling class."""
# -*- coding: utf-8 -*-

from __future__ import print_function

import hashlib
import logging
import time
from xml.etree import ElementTree

from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC

from requests import Session

from .errors import InvalidError, LoginError, NotLoggedInError
from .fritzhomedevice import FritzhomeDevice
from .fritzhomedevice import FritzhomeTemplate
from typing import Dict

_LOGGER = logging.getLogger(__name__)


class Fritzhome(object):
    """Fritzhome object to communicate with the device."""

    _sid = None
    _session = None
    _devices: Dict[str, FritzhomeDevice] = None
    _templates: Dict[str, FritzhomeTemplate] = None

    def __init__(self, host, user, password, ssl_verify=True):
        """Create a fritzhome object."""
        self._host = host
        self._user = user
        self._password = password
        self._session = Session()
        self._ssl_verify = ssl_verify

    def _request(self, url, params=None, timeout=10):
        """Send a request with parameters."""
        rsp = self._session.get(
            url, params=params, timeout=timeout, verify=self._ssl_verify
        )
        rsp.raise_for_status()
        return rsp.text.strip()

    def _login_request(self, username=None, secret=None):
        """Send a login request with paramerters."""
        url = self.get_prefixed_host() + "/login_sid.lua?version=2"
        params = {}
        if username:
            params["username"] = username
        if secret:
            params["response"] = secret

        plain = self._request(url, params)
        dom = ElementTree.fromstring(plain)
        sid = dom.findtext("SID")
        blocktime = int(dom.findtext("BlockTime"))
        challenge = dom.findtext("Challenge")

        return (sid, challenge, blocktime)

    def _logout_request(self):
        """Send a logout request."""
        _LOGGER.debug("logout")
        url = self.get_prefixed_host() + "/login_sid.lua"
        params = {"security:command/logout": "1", "sid": self._sid}

        self._request(url, params)

    @staticmethod
    def _create_login_secrete_pbkdf2(challenge, password):
        challenge_parts = challenge.split("$")
        # Extract all necessary values encoded into the challenge
        iter1 = int(challenge_parts[1])
        salt1 = bytes.fromhex(challenge_parts[2])
        iter2 = int(challenge_parts[3])
        salt2 = bytes.fromhex(challenge_parts[4])
        # Hash twice, once with static salt...
        # hash1 = hashlib.pbkdf2_hmac("sha256", password.encode(), salt1, iter1)
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(), length=32, salt=salt1, iterations=iter1
        )
        hash1 = kdf.derive(password.encode())
        # Once with dynamic salt.
        # hash2 = hashlib.pbkdf2_hmac("sha256", hash1, salt2, iter2)
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(), length=32, salt=salt2, iterations=iter2
        )
        hash2 = kdf.derive(hash1)
        return f"{challenge_parts[4]}${hash2.hex()}"

    @staticmethod
    def _create_login_secret_md5(challenge, password):
        """Create a login secret."""
        to_hash = (challenge + "-" + password).encode("UTF-16LE")
        hashed = hashlib.md5(to_hash).hexdigest()
        return "{0}-{1}".format(challenge, hashed)

    def _aha_request(self, cmd, ain=None, param=None, rf=str):
        """Send an AHA request."""
        url = self.get_prefixed_host() + "/webservices/homeautoswitch.lua"

        _LOGGER.debug("self._sid:%s", self._sid)

        if not self._sid:
            raise NotLoggedInError

        params = {"switchcmd": cmd, "sid": self._sid}
        if param:
            params.update(param)
        if ain:
            params["ain"] = ain

        plain = self._request(url, params)
        if plain == "inval":
            raise InvalidError

        if rf == bool:
            return bool(int(plain))
        return rf(plain)

    def login(self):
        """Login and get a valid session ID."""
        (sid, challenge, blocktime) = self._login_request()
        _LOGGER.info("sid:%s, challenge:%s, blocktime:%s", sid, challenge, blocktime)
        if sid == "0000000000000000":
            if blocktime > 0:
                time.sleep(blocktime)
            # PBKDF2 (FRITZ!OS 7.24 or later)
            if challenge.startswith("2$"):
                secret = self._create_login_secrete_pbkdf2(challenge, self._password)
            # fallback to MD5
            else:
                secret = self._create_login_secret_md5(challenge, self._password)
            (sid2, challenge, _) = self._login_request(
                username=self._user, secret=secret
            )
            if sid2 == "0000000000000000":
                _LOGGER.warning("login failed %s", sid2)
                raise LoginError(self._user)
            self._sid = sid2

    def logout(self):
        """Logout."""
        self._logout_request()
        self._sid = None

    def get_prefixed_host(self):
        """Choose the correct protocol prefix for the host.

        Supports three input formats:
        - https://<host>(requests use strict certificate validation by default)
        - http://<host> (unecrypted)
        - <host> (unencrypted)
        """
        host = self._host
        if host.startswith("https://") or host.startswith("http://"):
            return host
        else:
            return "http://" + host

    def update_devices(self, ignore_removed=True):
        """Update the device."""
        _LOGGER.info("Updating Devices ...")
        if self._devices is None:
            self._devices = {}

        device_elements = self.get_device_elements()
        for element in device_elements:
            if element.attrib["identifier"] in self._devices.keys():
                _LOGGER.info(
                    "Updating already existing Device " + element.attrib["identifier"]
                )
                self._devices[element.attrib["identifier"]]._update_from_node(element)
            else:
                _LOGGER.info("Adding new Device " + element.attrib["identifier"])
                device = FritzhomeDevice(self, node=element)
                self._devices[device.ain] = device

        if not ignore_removed:
            for identifier in list(self._devices.keys()):
                if identifier not in [
                    element.attrib["identifier"] for element in device_elements
                ]:
                    _LOGGER.info("Removing no more existing device " + identifier)
                    self._devices.pop(identifier)

        return True

    def _get_listinfo_elements(self, entity_type):
        """Get the DOM elements for the entity list."""
        plain = self._aha_request("get" + entity_type + "listinfos")
        dom = ElementTree.fromstring(plain)
        _LOGGER.debug(dom)
        return dom.findall("*")

    def get_device_elements(self):
        """Get the DOM elements for the device list."""
        return self._get_listinfo_elements("device")

    def get_device_element(self, ain):
        """Get the DOM element for the specified device."""
        elements = self.get_device_elements()
        for element in elements:
            if element.attrib["identifier"] == ain:
                return element
        return None

    def get_devices(self):
        """Get the list of all known devices."""
        return list(self.get_devices_as_dict().values())

    def get_devices_as_dict(self):
        """Get the list of all known devices."""
        if self._devices is None:
            self.update_devices()
        return self._devices

    def get_device_by_ain(self, ain):
        """Return a device specified by the AIN."""
        return self.get_devices_as_dict()[ain]

    def get_device_present(self, ain):
        """Get the device presence."""
        return self._aha_request("getswitchpresent", ain=ain, rf=bool)

    def get_device_name(self, ain):
        """Get the device name."""
        return self._aha_request("getswitchname", ain=ain)

    def get_switch_state(self, ain):
        """Get the switch state."""
        return self._aha_request("getswitchstate", ain=ain, rf=bool)

    def set_switch_state_on(self, ain):
        """Set the switch to on state."""
        return self._aha_request("setswitchon", ain=ain, rf=bool)

    def set_switch_state_off(self, ain):
        """Set the switch to off state."""
        return self._aha_request("setswitchoff", ain=ain, rf=bool)

    def set_switch_state_toggle(self, ain):
        """Toggle the switch state."""
        return self._aha_request("setswitchtoggle", ain=ain, rf=bool)

    def get_switch_power(self, ain):
        """Get the switch power consumption."""
        return self._aha_request("getswitchpower", ain=ain, rf=int)

    def get_switch_energy(self, ain):
        """Get the switch energy."""
        return self._aha_request("getswitchenergy", ain=ain, rf=int)

    def get_temperature(self, ain):
        """Get the device temperature sensor value."""
        return self._aha_request("gettemperature", ain=ain, rf=float) / 10.0

    def _get_temperature(self, ain, name):
        plain = self._aha_request(name, ain=ain, rf=float)
        return (plain - 16) / 2 + 8

    def get_target_temperature(self, ain):
        """Get the thermostate target temperature."""
        return self._get_temperature(ain, "gethkrtsoll")

    def set_target_temperature(self, ain, temperature):
        """Set the thermostate target temperature."""
        temp = int(16 + ((float(temperature) - 8) * 2))

        if temp < min(range(16, 56)):
            temp = 253
        elif temp > max(range(16, 56)):
            temp = 254

        self._aha_request("sethkrtsoll", ain=ain, param={"param": temp})

    def set_window_open(self, ain, seconds):
        """Set the thermostate target temperature."""
        endtimestamp = int(time.time() + seconds)

        self._aha_request(
            "sethkrwindowopen", ain=ain, param={"endtimestamp": endtimestamp}
        )

    def set_boost_mode(self, ain, seconds):
        """Set the thermostate to boost mode."""
        endtimestamp = int(time.time() + seconds)

        self._aha_request("sethkrboost", ain=ain, param={"endtimestamp": endtimestamp})

    def get_comfort_temperature(self, ain):
        """Get the thermostate comfort temperature."""
        return self._get_temperature(ain, "gethkrkomfort")

    def get_eco_temperature(self, ain):
        """Get the thermostate eco temperature."""
        return self._get_temperature(ain, "gethkrabsenk")

    def get_device_statistics(self, ain):
        """Get device statistics."""
        plain = self._aha_request("getbasicdevicestats", ain=ain)
        return plain

    # Lightbulb-related commands

    def set_state_off(self, ain):
        """Set the switch/actuator/lightbulb to on state."""
        self._aha_request("setsimpleonoff", ain=ain, param={"onoff": 0})

    def set_state_on(self, ain):
        """Set the switch/actuator/lightbulb to on state."""
        self._aha_request("setsimpleonoff", ain=ain, param={"onoff": 1})

    def set_state_toggle(self, ain):
        """Toggle the switch/actuator/lightbulb state."""
        self._aha_request("setsimpleonoff", ain=ain, param={"onoff": 2})

    def set_level(self, ain, level):
        """Set level/brightness/height in interval [0,255]."""
        if level < 0:
            level = 0  # 0%
        elif level > 255:
            level = 255  # 100 %

        self._aha_request("setlevel", ain=ain, param={"level": int(level)})

    def set_level_percentage(self, ain, level):
        """Set level/brightness/height in interval [0,100]."""
        if level < 0:
            level = 0
        elif level > 100:
            level = 100

        self._aha_request("setlevelpercentage", ain=ain, param={"level": int(level)})

    def _get_colordefaults(self, ain):
        plain = self._aha_request("getcolordefaults", ain=ain)
        return ElementTree.fromstring(plain)

    def get_colors(self, ain):
        """Get colors (HSV-space) supported by this lightbulb."""
        colordefaults = self._get_colordefaults(ain)
        colors = {}
        for hs in colordefaults.iter("hs"):
            name = hs.find("name").text.strip()
            values = []
            for st in hs.iter("color"):
                values.append((st.get("hue"), st.get("sat"), st.get("val")))
            colors[name] = values
        return colors

    def set_color(self, ain, hsv, duration=0, mapped=True):
        """Set hue and saturation.

        hsv: HUE colorspace element obtained from get_colors()
        duration: Speed of change in seconds, 0 = instant
        """
        params = {
            "hue": int(hsv[0]),
            "saturation": int(hsv[1]),
            "duration": int(duration) * 10,
        }
        if mapped:
            self._aha_request("setcolor", ain=ain, param=params)
        else:
            # undocumented API method for free color selection
            self._aha_request("setunmappedcolor", ain=ain, param=params)

    def get_color_temps(self, ain):
        """Get temperatures supported by this lightbulb."""
        colordefaults = self._get_colordefaults(ain)
        temperatures = []
        for temp in colordefaults.iter("temp"):
            temperatures.append(temp.get("value"))
        return temperatures

    def set_color_temp(self, ain, temperature, duration=0):
        """Set color temperature.

        temperature: temperature element obtained from get_temperatures()
        duration: Speed of change in seconds, 0 = instant
        """
        params = {"temperature": int(temperature), "duration": int(duration) * 10}
        self._aha_request("setcolortemperature", ain=ain, param=params)

    # blinds
    # states: open, close, stop
    def _set_blind_state(self, ain, state):
        self._aha_request("setblind", ain=ain, param={"target": state})

    def set_blind_open(self, ain):
        """Set the blind state to open."""
        self._set_blind_state(ain, "open")

    def set_blind_close(self, ain):
        """Set the blind state to close."""
        self._set_blind_state(ain, "close")

    def set_blind_stop(self, ain):
        """Set the blind state to stop."""
        self._set_blind_state(ain, "stop")

    # Template-related commands

    def has_templates(self):
        """Check if the Fritz!Box supports smarthome templates."""
        plain = self._aha_request("gettemplatelistinfos")
        try:
            ElementTree.fromstring(plain)
        except ElementTree.ParseError:
            return False
        return True

    def update_templates(self, ignore_removed=True):
        """Update the template."""
        _LOGGER.info("Updating Templates ...")
        if self._templates is None:
            self._templates = {}

        template_elements = self.get_template_elements()
        for element in template_elements:
            if element.attrib["identifier"] in self._templates.keys():
                _LOGGER.info(
                    "Updating already existing Template " + element.attrib["identifier"]
                )
                self._templates[element.attrib["identifier"]]._update_from_node(element)
            else:
                _LOGGER.info("Adding new Template " + element.attrib["identifier"])
                template = FritzhomeTemplate(self, node=element)
                self._templates[template.ain] = template

        if not ignore_removed:
            for identifier in list(self._templates.keys()):
                if identifier not in [
                    element.attrib["identifier"] for element in template_elements
                ]:
                    _LOGGER.info("Removing no more existing template " + identifier)
                    self._templates.pop(identifier)

        return True

    def get_template_elements(self):
        """Get the DOM elements for the template list."""
        return self._get_listinfo_elements("template")

    def get_templates(self):
        """Get the list of all known templates."""
        return list(self.get_templates_as_dict().values())

    def get_templates_as_dict(self):
        """Get the list of all known templates."""
        if self._templates is None:
            self.update_templates()
        return self._templates

    def get_template_by_ain(self, ain):
        """Return a template specified by the AIN."""
        return self.get_templates_as_dict()[ain]

    def apply_template(self, ain):
        """Appliy a template."""
        self._aha_request("applytemplate", ain=ain)