kontron/python-ipmi

View on GitHub
pyipmi/hpm.py

Summary

Maintainability
F
3 days
Test Coverage
# Copyright (c) 2014  Kontron Europe GmbH
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA

from __future__ import print_function

import os
import codecs
import struct
import collections
import hashlib
import time

from array import array

from .errors import CompletionCodeError, HpmError, IpmiTimeoutError
from .msgs import create_request_by_name
from .msgs import constants
from .utils import check_completion_code, bcd_search, chunks
from .utils import py3dec_unic_bytes_fix, py3_array_tobytes
from .state import State
from .fields import VersionField


PROPERTY_GENERAL_PROPERTIES = 0
PROPERTY_CURRENT_VERSION = 1
PROPERTY_DESCRIPTION_STRING = 2
PROPERTY_ROLLBACK_VERSION = 3
PROPERTY_DEFERRED_VERSION = 4
PROPERTY_OEM = list(range(192, 255))

ACTION_BACKUP_COMPONENT = 0x00
ACTION_PREPARE_COMPONENT = 0x01
ACTION_UPLOAD_FOR_UPGRADE = 0x02
ACTION_UPLOAD_FOR_COMPARE = 0x03

CC_LONG_DURATION_CMD_IN_PROGRESS = 0x80

CC_GET_COMP_PROP_UPGRADE_NOT_SUPPORTED_OVER_INTF = 0x81
CC_GET_COMP_PROP_INVALID_COMPONENT = 0x82
CC_GET_COMP_PROP_INVALID_PROPERTIES_SELECTOR = 0x83

CC_INITIATE_UPGRADE_CMD_IN_PROGRESS = 0x80
CC_INITIATE_UPGRADE_INVALID_COMPONENT = 0x81

CC_QUERY_SELFTEST_COMPLETED = 0x00
CC_QUERY_SELFTEST_IN_PROGRESS = 0x80
CC_QUERY_SELFTEST_UPGRADE_NOT_SUPPORTED_OVER_INTF = 0x81
CC_QUERY_SELFTEST_NO_RESULTS_AVAILABLE = 0xD5

CC_ABORT_UPGRADE_CANNOT_ABORT = 0x80
CC_ABORT_UPGRADE_CANNOT_RESUME_OPERATION = 0x81


class Hpm(object):

    @staticmethod
    def _get_component_count(components):
        """Return the number of components."""
        return bin(components).count('1')

    def get_target_upgrade_capabilities(self):
        rsp = self.send_message_with_name('GetTargetUpgradeCapabilities')
        return TargetUpgradeCapabilities(rsp)

    def get_component_property(self, component_id, property_id):
        rsp = self.send_message_with_name('GetComponentProperties',
                                          id=component_id,
                                          selector=property_id)
        return ComponentProperty.from_data(property_id, rsp.data)

    def get_component_properties(self, component_id):
        properties = []
        for p in (PROPERTY_GENERAL_PROPERTIES, PROPERTY_CURRENT_VERSION,
                  PROPERTY_DESCRIPTION_STRING, PROPERTY_ROLLBACK_VERSION,
                  PROPERTY_DEFERRED_VERSION):
            try:
                prop = self.get_component_property(component_id, p)
                if prop is not None:
                    properties.append(prop)
            except CompletionCodeError as e:
                if e.cc == CC_GET_COMP_PROP_INVALID_PROPERTIES_SELECTOR:
                    continue
        return properties

    def find_component_id_by_descriptor(self, descriptor):
        caps = self.get_target_upgrade_capabilities()
        for component_id in caps.components:
            prop = self.get_component_property(component_id,
                                               PROPERTY_DESCRIPTION_STRING)
            if prop is not None:
                if prop.description == descriptor:
                    return component_id
        return None

    def abort_firmware_upgrade(self):
        self.send_message_with_name('AbortFirmwareUpgrade')

    def initiate_upgrade_action(self, components_mask, action):
        """Initiate Upgrade Action.

        components:
        action:
            ACTION_BACKUP_COMPONENT = 0x00
            ACTION_PREPARE_COMPONENT = 0x01
            ACTION_UPLOAD_FOR_UPGRADE = 0x02
            ACTION_UPLOAD_FOR_COMPARE = 0x03
        """
        if action in (ACTION_UPLOAD_FOR_UPGRADE, ACTION_UPLOAD_FOR_COMPARE):
            if self._get_component_count(components_mask) != 1:
                raise HpmError("more than 1 component not support for action")

        self.send_message_with_name('InitiateUpgradeAction',
                                    components=components_mask, action=action)

    def initiate_upgrade_action_and_wait(self, components_mask, action,
                                         timeout=2, interval=0.1):
        """Initiate Upgrade Action and wait for long running command."""
        try:
            self.initiate_upgrade_action(components_mask, action)
        except CompletionCodeError as e:
            if e.cc == CC_LONG_DURATION_CMD_IN_PROGRESS:
                self.wait_for_long_duration_command(
                        constants.CMDID_HPM_INITIATE_UPGRADE_ACTION,
                        timeout, interval)
            else:
                raise HpmError('initiate_upgrade_action CC=0x%02x' % e.cc)

    def upload_firmware_block(self, block_number, data):
        if isinstance(data, str):
            data = [ord(c) for c in data]

        self.send_message_with_name('UploadFirmwareBlock', number=block_number,
                                    data=data)

    @staticmethod
    def _determine_max_block_size():
        return 22

    def upload_binary(self, binary, timeout=2, interval=0.1, retry=3):
        """Upload all firmware blocks from a binary."""
        block_number = 0
        block_size = self._determine_max_block_size()

        for chunk in chunks(binary, block_size):
            try:
                self.upload_firmware_block(block_number, chunk)
            except CompletionCodeError as e:
                if e.cc == CC_LONG_DURATION_CMD_IN_PROGRESS:

                    self.wait_for_long_duration_command(
                            constants.CMDID_HPM_UPLOAD_FIRMWARE_BLOCK,
                            timeout, interval)
                else:
                    raise HpmError('upload_firmware_block CC=0x%02x' % e.cc)
            except IpmiTimeoutError:
                retry -= 1
                if retry == 0:
                    raise IpmiTimeoutError()

            block_number += 1
            block_number &= 0xff

    def finish_firmware_upload(self, component, length):
        return self.send_message_with_name('FinishFirmwareUpload',
                                           component_id=component,
                                           image_length=length)

    def finish_upload_and_wait(self, component, length,
                               timeout=2, interval=0.1):
        """Finish, upload and for the firmware."""
        try:
            rsp = self.finish_firmware_upload(component, length)
            check_completion_code(rsp.completion_code)
        except CompletionCodeError as e:
            if e.cc == CC_LONG_DURATION_CMD_IN_PROGRESS:
                self.wait_for_long_duration_command(
                            constants.CMDID_HPM_FINISH_FIRMWARE_UPLOAD,
                            timeout, interval)
            else:
                raise HpmError('finish_firmware_upload CC=0x%02x' % e.cc)

    def get_upgrade_status(self):
        return UpgradeStatus(self.send_message_with_name('GetUpgradeStatus'))

    def wait_for_long_duration_command(self, expected_cmd, timeout, interval):

        start_time = time.time()
        while time.time() < start_time + timeout:
            try:
                status = self.get_upgrade_status()
                if status.command_in_progress is not expected_cmd \
                        and status.command_in_progress != 0x34:
                    pass
                if status.last_completion_code \
                        == CC_LONG_DURATION_CMD_IN_PROGRESS:
                    time.sleep(interval)
                else:
                    return
            except IpmiTimeoutError:
                time.sleep(interval)
            except IOError:
                time.sleep(interval)

    def activate_firmware(self, rollback_override=None):
        req = create_request_by_name('ActivateFirmware')
        if rollback_override is not None:
            req.rollback_override_policy = rollback_override
        rsp = self.send_message(req)
        check_completion_code(rsp.completion_code)

    def activate_firmware_and_wait(self, rollback_override=None,
                                   timeout=2, interval=1):
        """Activate and wait for the new uploaded firmware."""
        try:
            self.activate_firmware(rollback_override)
        except CompletionCodeError as e:
            if e.cc == CC_LONG_DURATION_CMD_IN_PROGRESS:
                self.wait_for_long_duration_command(
                            constants.CMDID_HPM_ACTIVATE_FIRMWARE,
                            timeout, interval)
            else:
                raise HpmError('activate_firmware CC=0x%02x' % e.cc)
        except IpmiTimeoutError:
            # controller is in reset and flashed new firmware
            pass

    def query_selftest_results(self):
        return SelfTestResult(
            self.send_message_with_name('QuerySelftestResults'))

    def query_rollback_status(self):
        return RollbackStatus(
            self.send_message_with_name('QueryRollbackStatus'))

    def initiate_manual_rollback(self):
        return RollbackStatus(
            self.send_message_with_name('InitiateManualRollback'))

    def initiate_manual_rollback_and_wait(self, timeout=2, interval=0.1):
        try:
            self.initiate_manual_rollback()
        except CompletionCodeError as e:
            if e.cc == CC_LONG_DURATION_CMD_IN_PROGRESS:
                self.wait_for_long_duration_command(
                            constants.CMDID_HPM_INITIATE_MANUAL_ROLLBACK,
                            60, interval)
            else:
                raise HpmError('activate_firmware CC=0x%02x' % e.cc)
        except IpmiTimeoutError:
            # controller is in reset and flashed new firmware
            pass

    @staticmethod
    def open_upgrade_image(filename):
        return UpgradeImage(filename)

    @staticmethod
    def get_upgrade_version_from_file(filename):
        image = UpgradeImage(filename)
        for action in image.actions:
            if isinstance(action, UpgradeActionRecordUploadForUpgrade):
                return action.firmware_version
        return None

    @staticmethod
    def _do_upgrade_action_backup(image):
        for action in image.actions:
            if isinstance(action, UpgradeActionRecordBackup):
                pass

    @staticmethod
    def _do_upgrade_action_prepare(image):
        for action in image.actions:
            if isinstance(action, UpgradeActionRecordPrepare):
                print("do ACTION_PREPARE_COMPONENT")

    @staticmethod
    def _do_upgrade_action_upload(image):
        for action in image.actions:
            if isinstance(action, UpgradeActionRecordUploadForUpgrade):
                print("do ACTION_UPLOAD_FOR_UPGRADE")

    def preparation_stage(self, image):
        ####################################################
        # match device ID, manfuacturer ID, etc.
        device_id = self.get_device_id()

        header = image.header

        if header.device_id != device_id.device_id:
            raise HpmError('Device ID: image=0x%x device=0x%x'
                           % (header.device_id, device_id.device_id))
        if header.manufacturer_id != device_id.manufacturer_id:
            raise HpmError('Manufacturer ID: image=0x%x device=0x%x'
                           % (header.manufacturer_id,
                              device_id.manufacturer_id))
        if header.product_id != device_id.product_id:
            raise HpmError('Product ID: image=0x%x device=0x%x'
                           % (header.product_id, device_id.product_id))

        # tbd check version

        ####################################################
        # compare current revision with upgrade image earlist comp rev
        targetCap = self.get_target_upgrade_capabilities()
        # tbd check version

        ####################################################
        # Match IPM Controller capabilities with Upgrade Image capabilities
        support = False
        for imageComponent in header.components:
            if imageComponent in targetCap.components:
                support = True

        if support is not True:
            raise HpmError('no supported component in image')

    def upgrade_stage(self, image, component):
        for action in image.actions:
            if action.components & (1 << component) == 0:
                continue
            self.initiate_upgrade_action_and_wait(1 << component,
                                                  action.action_type)
            if isinstance(action, UpgradeActionRecordUploadForUpgrade):
                self.upload_binary(action.firmware_image_data)
                self.finish_upload_and_wait(component, action.firmware_length)

    def _activation_state_do_self_testing(self):
        pass

    def wait_until_new_firmware_comes_up(self, timeout, interval):
        start_time = time.time()
        while time.time() < start_time + timeout:
            try:
                self.get_upgrade_status()
                self.get_device_id()
            except IpmiTimeoutError:
                time.sleep(interval)
            except IOError:
                time.sleep(interval)
        time.sleep(5)

    def activation_stage(self, image, component):
        self.activate_firmware_and_wait(
            image.header.inaccessibility_timeout, 1)
        self.wait_until_new_firmware_comes_up(
            image.header.inaccessibility_timeout, 1)
        self._activation_state_do_self_testing()

    def install_component_from_image(self, image, component):
        self.abort_firmware_upgrade()
        if component not in image.header.components:
            raise HpmError('component=%d not in image' % component)
        self.preparation_stage(image)
        self.upgrade_stage(image, component)
        self.activation_stage(image, component)

    def install_component_from_file(self, filename, component):
        image = UpgradeImage(filename)
        self.install_component_from_image(image, component)


class UpgradeStatus(State):

    def _from_response(self, rsp):
        self.command_in_progress = rsp.command_in_progress
        self.last_completion_code = rsp.last_completion_code

    def __str__(self):
        string = []
        string.append("cmd=0x%02x cc=0x%02x" %
                      (self.command_in_progress, self.last_completion_code))
        return "\n".join(string)


class TargetUpgradeCapabilities(State):

    def _from_response(self, rsp):
        self.version = rsp.hpm_1_version
        self.components = []
        for i in range(8):
            if rsp.component_present & (1 << i):
                self.components.append(i)

    def __str__(self):
        string = []
        string.append("Target Upgrade Capabilities")
        string.append(" HPM.1 version: %s" % self.version)
        string.append(" Components: %s" % self.components)
        return "\n".join(string)


codecs.register(bcd_search)


class ComponentProperty(object):
    def __init__(self, data=None):
        if (data):
            self._from_rsp_data(data)

    @staticmethod
    def from_data(component_id, data):
        if isinstance(data, str):
            data = [ord(c) for c in data]

        if component_id is PROPERTY_GENERAL_PROPERTIES:
            return ComponentPropertyGeneral(data)
        elif component_id is PROPERTY_CURRENT_VERSION:
            return ComponentPropertyCurrentVersion(data)
        elif component_id is PROPERTY_DESCRIPTION_STRING:
            return ComponentPropertyDescriptionString(data)
        elif component_id is PROPERTY_ROLLBACK_VERSION:
            return ComponentPropertyRollbackVersion(data)
        elif component_id is PROPERTY_DEFERRED_VERSION:
            return ComponentPropertyDeferredVersion(data)
        elif component_id in PROPERTY_OEM:
            raise NotImplementedError


class ComponentPropertyGeneral(ComponentProperty):

    ROLLBACK_SUPPORT_MASK = 0x03
    PREPARATION_SUPPORT_MASK = 0x04
    COMPARISON_SUPPORT_MASK = 0x08
    DEFERRED_ACTIVATION_SUPPORT_MASK = 0x10
    PAYLOAD_COLD_RESET_REQ_SUPPORT_MASK = 0x20

    def _from_rsp_data(self, data):
        support = []
        cap = data[0]

        if cap & self.ROLLBACK_SUPPORT_MASK == 0:
            support.append('rollback_backup_not_supported')
        elif cap & self.ROLLBACK_SUPPORT_MASK == 1:
            support.append('rollback_is_supported')
        elif cap & self.ROLLBACK_SUPPORT_MASK == 2:
            support.append('rollback_is_supported')
        elif cap & self.ROLLBACK_SUPPORT_MASK == 3:
            support.append('reserved')

        if cap & self.PREPARATION_SUPPORT_MASK:
            support.append('prepartion')
        if cap & self.COMPARISON_SUPPORT_MASK:
            support.append('comparison')
        if cap & self.DEFERRED_ACTIVATION_SUPPORT_MASK:
            support.append('deferred_activation')
        if cap & self.PAYLOAD_COLD_RESET_REQ_SUPPORT_MASK:
            support.append('payload_cold_reset_required')

        self.general = support


class ComponentPropertyCurrentVersion(ComponentProperty):

    def _from_rsp_data(self, data):
        self.version = VersionField(data)


class ComponentPropertyDescriptionString(ComponentProperty):

    def _from_rsp_data(self, data):
        descr = py3_array_tobytes(array('B', data))
        descr = py3dec_unic_bytes_fix(descr)
        # strip '\x00'
        descr = descr.replace('\0', '')
        self.description = descr


class ComponentPropertyRollbackVersion(ComponentProperty):

    def _from_rsp_data(self, data):
        self.version = VersionField(data)


class ComponentPropertyDeferredVersion(ComponentProperty):

    def _from_rsp_data(self, data):
        self.version = VersionField(data)


class ComponentPropertyOem(ComponentProperty):

    def _from_rsp_data(self, data):
        self.oem_data = data


class SelfTestResult(State):

    CORRUPTED_OR_INACCESSIBLE_DATA_OR_DEVICES = 0x57

    def _from_response(self, rsp):
        self.status = rsp.selftest_result_1

        result2 = rsp.selftest_result_2

        if self.status != self.CORRUPTED_OR_INACCESSIBLE_DATA_OR_DEVICES:
            self.fail_sel = (result2 & 0x80) >> 7
            self.fail_sdrr = (result2 & 0x40) >> 6
            self.fail_bmc_fru = (result2 & 0x20) >> 5
            self.fail_ipmb = (result2 & 0x10) >> 4
        self.fail_sdrr_empty = (result2 & 0x08) >> 3
        self.fail_bmc_fru_interanl_area = (result2 & 0x04) >> 2
        self.fail_bootblock = (result2 & 0x02) >> 1
        self.fail_mc = (result2 & 0x01) >> 0


class RollbackStatus(object):
    def __init__(self, rsp=None):
        if rsp:
            self._from_rsp(rsp)

    def _from_rsp(self, rsp):

        if rsp.completion_estimate:
            self.percent_complete = rsp.completion_estimate


image_header = collections.namedtuple('image_header',
                                      ['field_name', 'format', 'start', 'len'])


class UpgradeImageHeaderRecord(object):
    FORMAT = [
        image_header('format_version', 'B', 8, 1),
        image_header('device_id', 'B', 9, 1),
        image_header('product_id', '<H', 13, 2),
        image_header('time', '<L', 15, 4),
        image_header('capabilities', 'B', 19, 1),
        image_header('selftest_timeout', 'B', 21, 1),
        image_header('rollback_timeout', 'B', 22, 1),
        image_header('inaccessibility_timeout', 'B', 23, 1),
        image_header('earliest_compatible_revision', '<H', 24, 2),
        image_header('oem_data_length', '<H', 32, 2),
    ]

    def __init__(self, data=None):
        for a in self.FORMAT:
            setattr(self, a.field_name, None)
        if data:
            self._from_data(data)

    def _from_data(self, data):
        self.signature = data[0:8]

        for a in self.FORMAT:
            setattr(self, a.field_name, struct.unpack(
                    a.format, data[a.start:a.start+a.len])[0])

        if isinstance(data, str):
            data = [ord(c) for c in data]

        self.manufacturer_id = data[10] | data[11] << 8 | data[12] << 16
        self.components = []
        for i in range(8):
            if data[20] & (1 << i):
                self.components.append(i)
        self.earliest_compatible_revision = \
            VersionField(data[24:24 + VersionField.VERSION_FIELD_LEN])
        self.firmware_revision = \
            VersionField(data[26:26 + VersionField.VERSION_WITH_AUX_FIELD_LEN])

        if self.oem_data_length:
            self.oem_data = data[34:-1]
        # XXX checksum check
        self.checksum = data[34 + self.oem_data_length]
        self.length = 34 + self.oem_data_length+1

    def __str__(self):
        str = []
        str.append("HPM Upgrade Image header")
        str.append(" Signature:        %s" % self.signature)
        str.append(" Format Version:   %s" % self.format_version)
        str.append(" Device ID:        %s" % self.device_id)
        str.append(" Manufacturer:     %s" % self.manufacturer_id)
        str.append(" Product ID:       %s" % self.product_id)
        str.append(" Time:             %s" % self.time)
        str.append(" Image Cap:        0x%02x" % self.capabilities)
        str.append(" Components:       %s" % self.components)
        str.append(" Selftest Timeout: %s" % self.selftest_timeout)
        str.append(" Rollback Timeout: %s" % self.rollback_timeout)
        str.append(" Inacc. Timeout:   %s" % self.inaccessibility_timeout)
        str.append(" Earliest comp.:   %s" % self.earliest_compatible_revision)
        str.append(" firmware Revision:%s" % self.firmware_revision)
        str.append(" OEM data len:     %s" % self.oem_data_length)
        return "\n".join(str)


class UpgradeActionRecord(object):

    ACTIONS = (
        "Backup",
        "Prepare",
        "Upload for Upgrade",
        "Upload for Compare"
    )

    def __init__(self, data=None):
        self.action_type = array('B', data)[0]
        if data:
            (self.action, self.components, self.checksum) \
                = struct.unpack('BBB', data[0:3])
            self.length = 3

    @staticmethod
    def create_from_data(data):
        action_type = array('B', data)[0]
        if action_type == ACTION_BACKUP_COMPONENT:
            return UpgradeActionRecordBackup(data)
        elif action_type == ACTION_PREPARE_COMPONENT:
            return UpgradeActionRecordPrepare(data)
        elif action_type == ACTION_UPLOAD_FOR_UPGRADE:
            return UpgradeActionRecordUploadForUpgrade(data)
        elif action_type == ACTION_UPLOAD_FOR_COMPARE:
            return UpgradeActionRecordUploadForCompare(data)
        else:
            raise HpmError('unsupported ActionRecord')

    def __str__(self):
        str = []
        str.append("Action Record Type: 0x%x (%s) " %
                   (self.action, self.ACTIONS[self.action]))
        str.append(" Components: 0x%02x" % self.components)
        return "\n".join(str)


class UpgradeActionRecordBackup(UpgradeActionRecord):
    pass


class UpgradeActionRecordPrepare(UpgradeActionRecord):
    pass


class UpgradeActionRecordUploadForUpgrade(UpgradeActionRecord):
    def __init__(self, data=None):
        UpgradeActionRecord.__init__(self, data)
        if data:
            self.firmware_version = \
                VersionField(
                    data[3:3 + VersionField.VERSION_WITH_AUX_FIELD_LEN])
            self.firmware_description_string \
                = py3dec_unic_bytes_fix(data[9:30])
            self.firmware_length = struct.unpack('<L', data[30:34])[0]
            self.firmware_image_data = data[34:(34 + self.firmware_length)]
            self.length += 31 + self.firmware_length


class UpgradeActionRecordUploadForCompare(UpgradeActionRecord):
    pass


class ImageChecksumRecord(object):
    def __init__(self, data=None):
        if data:
            self._from_data(data)

    def _from_data(self, data):
        self.data = data[0:16]


HPM_IMAGE_CHECKSUM_SIZE = 16


class UpgradeImage(object):
    def __init__(self, filename=None):
        self.actions = None

        if filename:
            self._from_file(filename)

    def __str__(self):
        str = []
        return "\n".join(str)

    def _check_md5_sum(self, filedata):
        summer = hashlib.md5()
        self.checksum_actual \
            = summer.update(filedata[:-HPM_IMAGE_CHECKSUM_SIZE])
        self.checksum_expected = filedata[-HPM_IMAGE_CHECKSUM_SIZE:]

    def _from_file(self, filename):

        try:
            file = open(filename, "rb")
        except IOError:
            print('Error open file "%s"' % filename)

        ################################
        # get file size
        file_size = os.stat(filename).st_size
        file_data = file.read(file_size)

        ################################
        # get image checksum
        self._check_md5_sum(file_data)
        # XXX verify checksum

        ################################
        # Upgrade Image Header
        self.header = UpgradeImageHeaderRecord(file_data)
        off = self.header.length

        ################################
        # Upgrade Actions
        self.actions = []
        while (off + HPM_IMAGE_CHECKSUM_SIZE) < len(file_data):
            action = UpgradeActionRecord.create_from_data(file_data[off:])
            self.actions.append(action)
            off += action.length

        ################################
        # Image checksum
        self.checksum = ImageChecksumRecord(file_data[off:file_size])

        file.close()