CiscoUcs/imcsdk

View on GitHub
imcsdk/imcsession.py

Summary

Maintainability
D
2 days
Test Coverage
# Copyright 2015 Cisco Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import time
import logging
import os
from threading import Timer

from .imcexception import ImcException, ImcLoginError
from .imccoremeta import ImcVersion
from .imcdriver import ImcDriver
from .imcgenutils import Progress

log = logging.getLogger('imc')


class ImcSession(object):
    """
    ImcSession class is session interface for any Imc related communication.
    Parent class of ImcHandle, used internally by ImcHandle class.
    """

    def __init__(self, ip, username, password, port=None, secure=None,
                 proxy=None, auto_refresh=False, force=False, timeout=None):
        self.__ip = ip
        self.__username = username
        self.__password = password
        self.__proxy = proxy
        self.__auto_refresh = auto_refresh
        self.__timeout = timeout
        self.__uri = self.__create_uri(port, secure)
        self.__starship_proxy = None
        self.__starship_headers = None
        self.__model = None

        self.__imc = ip
        self.__name = None
        self.__cookie = None
        self.__session_id = None
        self.__version = None
        self.__refresh_period = None
        self.__priv = None
        self.__domains = None
        self.__channel = None
        self.__evt_channel = None
        self.__last_update_time = None

        self.__refresh_timer = None
        self.__force = force

        self.__dump_xml = False
        self.__redirect = False
        self.__driver = ImcDriver(proxy=self.__proxy)

        # In debug mode, log the XMLs to a file
        if os.path.exists('/tmp/imcsdk_debug'):
            self.__dump_xml = True

    @property
    def ip(self):
        return self.__ip

    @property
    def username(self):
        return self.__username

    @property
    def proxy(self):
        return self.__proxy

    @property
    def uri(self):
        return self.__uri

    @property
    def imc(self):
        return self.__imc

    @property
    def name(self):
        return self.__name

    @property
    def cookie(self):
        return self.__cookie

    @property
    def session_id(self):
        return self.__session_id

    @property
    def refresh_period(self):
        return self.__refresh_period

    @property
    def priv(self):
        return self.__priv

    @property
    def domains(self):
        return self.__domains

    @property
    def channel(self):
        return self.__channel

    @property
    def evt_channel(self):
        return self.__evt_channel

    @property
    def last_update_time(self):
        return self.__last_update_time

    @property
    def platform(self):
        return self.__platform

    @property
    def model(self):
        return self.__model

    @property
    def version(self):
        return self.__version

    def __create_uri(self, port, secure):
        """
        Generates IMC URI used for connection

        Args:
            port (int or None): The port number to be used during connection
            secure (bool or None): True for secure connection otherwise False

        Returns:
            uri (str)

        Example:
            uri = __create_uri(port=443, secure=True)
        """

        port = _get_port(port, secure)
        protocol = _get_proto(port, secure)

        uri = "%s://%s%s%s" % (protocol, self.__ip, ":", str(port))
        return uri

    def __clear(self):
        """
        Internal method to clear the session variables
        """

        self.__name = None
        self.__cookie = None
        self.__session_id = None
        self.__version = None
        self.__refresh_period = None
        self.__priv = None
        self.__domains = None
        self.__channel = None
        self.__evt_channel = None
        self.__last_update_time = str(time.asctime())

    def __update(self, response):
        """
        Internal method to update the session variables
        """

        from .imccoremeta import ImcVersion
        self.__cookie = response.out_cookie
        self.__session_id = response.out_session_id
        self.__version = ImcVersion(response.out_version)
        self.__refresh_period = int(response.out_refresh_period)
        self.__priv = response.out_priv
        self.__domains = response.out_domains
        self.__channel = response.out_channel
        self.__evt_channel = response.out_evt_channel
        self.__last_update_time = str(time.asctime())

    def post(self, uri, data=None, read=True, timeout=None):
        """
        sends the request and receives the response from the imc server

        Args:
            uri (str): URI of the  the imc Server
            data (str): request data to send via post request

        Returns:
            response xml string

        Example:
            response = post("http://192.168.1.1:80", data=xml_str)
        """

        timeout = timeout if timeout is not None else self.__timeout
        response = self.__driver.post(uri=uri, data=data, read=read, timeout=timeout)
        return response

    def post_xml(self, xml_str, read=True, timeout=None):
        """
        sends the xml request and receives the response from the imc server

        Args:
            xml_str (str): xml string
            read (bool): if True, returns response.read() else returns object.
            timeout (int): if set, this will be used as timeout in secs for urllib2

        Returns:
            response xml string

        Example:
            response = post_xml('<aaaLogin inName="user" inPassword="pass">')
        """

        if self.__starship_proxy is not None:
            self.__uri = self.__starship_proxy
            imc_uri = self.__starship_proxy
        else:
            imc_uri = self.__uri + "/nuova"
        response_str = self.post(uri=imc_uri, data=xml_str, read=read, timeout=timeout)
        if self.__driver.redirect_uri:
            self.__uri = self.__driver.redirect_uri

        return response_str

    def dump_xml_request(self, elem):
        from . import imcxmlcodec as xc
        if not self.__dump_xml:
            return

        if elem.tag == "aaaLogin":
            elem.attrib['inPassword'] = "*********"
            xml_str = xc.to_xml_str(elem)
            log.debug('%s ====> %s' % (self.__uri, xml_str))
            elem.attrib['inPassword'] = self.__password
            xml_str = xc.to_xml_str(elem)
        else:
            xml_str = xc.to_xml_str(elem)
            log.debug('%s ====> %s' % (self.__uri, xml_str))

    def dump_xml_response(self, resp):
        if self.__dump_xml:
            log.debug('%s <==== %s' % (self.__uri, resp))

    def post_elem(self, elem, timeout=None):
        """
        sends the request and receives the response from the imc server using xml
        element

        Args:
            elem (xml element)
            timeout (int): if set, it is used as timeout in secs for urllib2

        Returns:
            response xml string

        Example:
            response = post_elem(elem=xml_element)
        """

        from . import imcxmlcodec as xc

        if self._is_stale_cookie(elem):
            elem.attrib['cookie'] = self.cookie

        self.dump_xml_request(elem)
        xml_str = xc.to_xml_str(elem)

        response_str = self.post_xml(xml_str, timeout=timeout)
        self.dump_xml_response(response_str)

        if response_str:
            response = xc.from_xml_str(response_str, self)

            # Cookie update should happen with-in the lock
            # this ensures that the next packet goes out
            # with the new cookie
            if elem.tag == "aaaRefresh":
                self._update_cookie(response)

            return response

        return None

    def file_download(
            self,
            url_suffix,
            file_dir,
            file_name,
            progress=Progress()):
        """
        Downloads the file from the imc server

        Args:
            url_suffix (str): suffix url to be appended to
                    http\https://host:port/ to locate the file on the server
            file_dir (str): The directory to download to
            file_name (str): The destination file name for the download
            progress (imcgenutils.Progress): Class that has method to display progress

        Returns:
            None

        Example:
            file_download(url_suffix='backupfile/config_backup.xml',
            file_dir='/home/user/backup', file_name='my_config_backup.xml')
        """

        from .imcgenutils import download_file

        file_url = "%s/%s" % (self.__uri, url_suffix)

        self.__driver.add_header('Cookie', 'imc-cookie=%s'
                                 % self.__cookie)

        download_file(driver=self.__driver,
                      file_url=file_url,
                      file_dir=file_dir,
                      file_name=file_name,
                      progress=progress)

        self.__driver.remove_header('Cookie')

    def file_upload(
            self,
            url_suffix,
            file_dir,
            file_name,
            progress=Progress()):
        """
        Uploads the file on IMC server.

        Args:
            url_suffix (str): suffix url to be appended to
                http\https://host:port/ to locate the file on the server
            file_dir (str): The directory to upload from
            file_name (str): The destination file name for the download
            progress (imcgenutils.Progress): Class that has method to display progress

        Returns:
            None

        Example:
            file_dir = "/home/user/backup"\n
            file_name = "config_backup.xml"\n
            uri_suffix = "operations/file-%s/importconfig.txt" % file_name\n
            file_upload(url_suffix=uri_suffix, file_dir=file_dir,
            file_name=file_name)
        """

        from .imcgenutils import upload_file

        file_url = "%s/%s" % (self.__uri, url_suffix)

        self.__driver.add_header('Cookie', 'imc-cookie=%s'
                                 % self.__cookie)

        upload_file(self.__driver,
                    uri=file_url,
                    file_dir=file_dir,
                    file_name=file_name,
                    progress=progress)

        self.__driver.remove_header('Cookie')

    def _start_refresh_timer(self):
        """
        Internal method to support auto-refresh functionality.
        """

        if self.__refresh_period > 60:
            interval = int(self.__refresh_period) - 60
        else:
            interval = 60
        self.__refresh_timer = Timer(interval, self._refresh)
        self.__refresh_timer.setDaemon(True)
        self.__refresh_timer.start()

    def _stop_refresh_timer(self):
        """
        Internal method to support auto-refresh functionality.
        """

        if self.__refresh_timer:
            self.__refresh_timer.cancel()
            self.__refresh_timer = None

    def _update_cookie(self, response):
        if response.error_code != 0:
            return
        self.__cookie = response.out_cookie

    def _is_stale_cookie(self, elem):
        return 'cookie' in elem.attrib and elem.attrib[
            'cookie'] != "" and elem.attrib['cookie'] != self.cookie

    def _refresh(self, auto_relogin=True):
        """
        Sends the aaaRefresh query to the imc to refresh the connection
        (to prevent session expiration).
        """

        from .imcmethodfactory import aaa_refresh

        self._stop_refresh_timer()

        elem = aaa_refresh(self.__cookie,
                           self.__username,
                           self.__password)
        response = self.post_elem(elem)
        if response.error_code != 0:
            self.__cookie = None
            if auto_relogin:
                return self._login(auto_refresh=True)
            return False

        self.__cookie = response.out_cookie
        self.__refresh_period = int(response.out_refresh_period)
        self.__priv = response.out_priv.split(',')
        self.__domains = response.out_domains
        self.__last_update_time = str(time.asctime())

        # re-enable the timer
        self._start_refresh_timer()
        return True

    def _is_fabric_interconnect(self):
        from .imcmethodfactory import config_resolve_class

        nw_elem = config_resolve_class(cookie=self.__cookie,
                                       class_id="networkElement")
        try:
            nw_elem_response = self.post_elem(nw_elem)
            if nw_elem_response.error_code == 0:
                return True
            else:
                return False
        except:
            return False

    def _validate_connection(self):
        """
        Internal method to validate if needs to reconnect or if exist use the
        existing connection.
        """

        from .mometa.top.TopSystem import TopSystem
        from .imcmethodfactory import config_resolve_dn

        if self.__cookie and self.__cookie != "":
            if not self.__force:
                top_system = TopSystem()
                elem = config_resolve_dn(cookie=self.__cookie,
                                         dn=top_system.dn)
                response = self.post_elem(elem)
                if response.error_code != 0:
                    return False
                return True
            else:
                try:
                    self._logout()
                except Exception as e:
                    # Logout can fail when the cookie is stale.
                    # Cookie can be stale when CIMC has restarted.
                    # Empty the cookie here.
                    self.__cookie = None
                    raise
        return False

    def _validate_model(self, model):
        valid_model_prefixes = ["UCSC", "UCS-E", "UCSS", "HX", "APIC-SERVER-", "DN1", "DN2", "DN3"]
        valid_models = ["R460-4640810", "C260-BASE-2646"]

        if model in valid_models:
            return True

        for prefix in valid_model_prefixes:
            if model.startswith(prefix):
                return True

        return False

    def _validate_imc(self):
        """
        This method validates if a given host is a supported IMC server
        """
        from .imcmethodfactory import config_resolve_class

        request = config_resolve_class(cookie=self.__cookie,
                                       class_id="biosUnit")
        response = self.post_elem(request)

        if not response or response.error_code != 0 or \
                len(response.out_configs.child) == 0:
            self.logout()
            return False

        for element in response.out_configs.child:
            model = element.model

            if not self._validate_model(model):
                self._logout()
                return False

            self._set_platform(model=model)
            self._set_model(model=model)

        return True

    def _update_version(self, response=None):
        from .imccoremeta import ImcVersion
        from .imcmethodfactory import config_resolve_dn
        from .mometa.top.TopSystem import TopSystem
        from .mometa.firmware.FirmwareRunning import FirmwareRunning, \
            FirmwareRunningConsts

        # If the aaaLogin response has the version populated, we do not
        # need to query for it
        # There are cases where version is missing from aaaLogin response
        # In such cases the later part of this method populates it
        if response.out_version and response.out_version != "":
            return

        top_system = TopSystem()
        firmware = FirmwareRunning(top_system,
                                   FirmwareRunningConsts.DEPLOYMENT_SYSTEM)
        elem = config_resolve_dn(cookie=self.__cookie,
                                 dn=firmware.dn)
        response = self.post_elem(elem)
        if response.error_code != 0:
            raise ImcException(response.error_code,
                               response.error_descr)
        if len(response.out_config.child) > 0:
            firmware = response.out_config.child[0]
            self._set_version(firmware.version)
        else:
            self._set_version("ancient")

    def _update_domain_name_and_ip(self):
        from .imcmethodfactory import config_resolve_dn
        from .mometa.top.TopSystem import TopSystem

        top_system = TopSystem()
        elem = config_resolve_dn(cookie=self.__cookie, dn=top_system.dn)
        response = self.post_elem(elem)
        if response.error_code != 0:
            raise ImcException(response.error_code, response.error_descr)
        top_system = response.out_config.child[0]
        self.__imc = top_system.name
        self.__virtual_ipv4_address = top_system.address

    def _login(self, auto_refresh=None, force=None, timeout=None):
        """
        Internal method responsible to do a login on imc server.

        Args:
            auto_refresh (bool): if set to True, it refresh the cookie
                                    continuously
            force (bool): if set to True it reconnects even if cookie exists
                                    and is valid for respective connection.
            timeout (int): timeout value in secs

        Returns:
            True on successful connect
        """
        from .imcmethodfactory import aaa_login
        from .imccoreutils import add_handle_to_list

        self.__force = force if force is not None else self.__force
        auto_refresh = auto_refresh if auto_refresh is not None else self.__auto_refresh

        if self._validate_connection():
            return True

        elem = aaa_login(in_name=self.__username,
                         in_password=self.__password)
        response = self.post_elem(elem, timeout=timeout)
        if response.error_code != 0:
            self.__clear()
            raise ImcException(response.error_code, response.error_descr)
        self.__update(response)

        # Verify to connect to IMC only
        if not self._validate_imc():
            raise ImcLoginError("Not a supported server.")

        self._update_version(response)
        self._update_domain_name_and_ip()

        if auto_refresh:
            self._start_refresh_timer()

        add_handle_to_list(self)
        return True

    def _logout(self, timeout=None):
        """
        Internal method to disconnect from the imc server.

        Args:
            None

        Returns:
            True on successful disconnect

        """

        from .imcmethodfactory import aaa_logout
        from .imccoreutils import remove_handle_from_list

        if self.__cookie is None:
            return True

        if self.__refresh_timer:
            self.__refresh_timer.cancel()

        elem = aaa_logout(self.__cookie)
        response = self.post_elem(elem, timeout=timeout)

        if response.error_code == "555":
            return True

        if response.error_code != 0:
            raise ImcException(response.error_code,
                               response.error_descr)

        self.__clear()

        remove_handle_from_list(self)
        return True

    def _is_starship(self):
        if self.__starship_proxy:
            return True
        return False

    def _set_starship_proxy(self, proxy):
        """
        Internal method to set proxy URL in starship environment
        """
        self.__starship_proxy = proxy
        self.__driver.__redirect_uri = proxy

    def _set_starship_headers(self, headers):
        """
        Internal method to set proxy URL in starship environment
        """
        self.__starship_headers = headers
        for header in headers:
            self.__driver.add_header(header, headers[header])

        # set cookie for to_xml to work correctly
        if headers["x-barracuda-session"]:
            self.__cookie = headers["x-barracuda-session"]
        else:
            self.__cookie = headers["x-barracuda-apikey"]

    def _set_dump_xml(self):
        """
        Internal method to set dump_xml to True
        """
        self.__dump_xml = True

    def _unset_dump_xml(self):
        """
        Internal method to set dump_xml to False
        """
        self.__dump_xml = False

    def _set_platform(self, model=None, platform=None):
        """
        Internal method to set the platform type
        Not to be exposed at the handle
        """
        from imcsdk.imccoreutils import IMC_PLATFORM
        modular_platform_prefixes = ["UCSC-C3X", "UCSS-S32"]
        if platform:
            self.__platform = platform
        elif model:
            self.__platform = IMC_PLATFORM.TYPE_CLASSIC
            for prefix in modular_platform_prefixes:
                if model.startswith(prefix):
                    self.__platform = IMC_PLATFORM.TYPE_MODULAR

    def _set_version(self, version):
        self.__version = ImcVersion(version)

    def _set_model(self, model, force=False):
        """
        Internal method to set the server model
        Not to be exposed at the handle
        """
        if force and model:
            self.__model = model
            return

        from .imccoreutils import IMC_PLATFORM
        class_ids = {
            IMC_PLATFORM.TYPE_MODULAR: "ComputeServerNode",
            IMC_PLATFORM.TYPE_CLASSIC: "ComputeRackUnit"
            }

        # Check for the right platform
        if self.__platform in class_ids:
            mo_list = self.query_classid(class_ids.get(self.__platform))
            if mo_list:
                self.__model = mo_list[0].model
                return

        # Unknown platform, should not end up here
        # Blindly assign the model passed
        self.__model = model

    def add_header(self, header_prop, header_value):
        self.__driver.add_header(header_prop, header_value)

def _get_port(port, secure):
    if port:
        return int(port)

    if secure is False:
        return 80
    return 443


def _get_proto(port, secure):
    if secure is None:
        if port == "80":
            return "http"
    elif secure is False:
        return "http"
    return "https"