kontron/python-ipmi

View on GitHub
pyipmi/sel.py

Summary

Maintainability
B
4 hrs
Test Coverage

# Copyright (c) 2014  Kontron Europe GmbH
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA

from array import array

from .errors import CompletionCodeError, DecodingError
from .utils import check_completion_code, ByteBuffer
from .msgs import create_request_by_name
from .msgs import constants
from .event import EVENT_ASSERTION, EVENT_DEASSERTION

from .helper import clear_repository_helper
from .state import State


class Sel(object):
    def get_sel_entries_count(self):
        info = SelInfo(self.send_message_with_name('GetSelInfo'))
        return info.entries

    def get_sel_reservation_id(self):
        rsp = self.send_message_with_name('ReserveSel')
        return rsp.reservation_id

    def _clear_sel(self, cmd, reservation):
        rsp = self.send_message_with_name('ClearSel',
                                          reservation_id=reservation,
                                          cmd=cmd)
        return rsp.status.erase_in_progress

    def clear_sel(self, retry=5):
        clear_repository_helper(self.get_sel_reservation_id,
                                self._clear_sel, retry)

    def delete_sel_entry(self, record_id, reservation=0):
        rsp = self.send_message_with_name('DeleteSelEntry',
                                          reservation_id=reservation,
                                          record_id=record_id)
        return rsp.record_id

    def get_and_clear_sel_entry(self, record_id):
        """Atomically gets and clears the specified SEL record"""
        while True:
            reservation = self.get_sel_reservation_id()
            try:
                sel_entry, _ = self.get_sel_entry(record_id, reservation)
            except CompletionCodeError as e:
                if e.cc == constants.CC_RES_CANCELED:
                    continue
                else:
                    raise
            try:
                self.delete_sel_entry(record_id, reservation)
            except CompletionCodeError as e:
                if e.cc == constants.CC_RES_CANCELED:
                    continue
                else:
                    raise
            return sel_entry

    def get_sel_entry(self, record_id, reservation=0):
        ENTIRE_RECORD = 0xff
        req = create_request_by_name('GetSelEntry')
        req.reservation_id = reservation
        req.record_id = record_id
        req.offset = 0
        self.max_req_len = ENTIRE_RECORD

        record_data = ByteBuffer()

        while True:
            req.length = self.max_req_len
            if (self.max_req_len != 0xff
                    and (req.offset + req.length) > 16):
                req.length = 16 - req.offset

            rsp = self.send_message(req)
            if rsp.completion_code == constants.CC_CANT_RET_NUM_REQ_BYTES:
                if self.max_req_len == 0xff:
                    self.max_req_len = 16
                else:
                    self.max_req_len -= 1
                continue
            else:
                check_completion_code(rsp.completion_code)

            record_data.extend(rsp.record_data)
            req.offset = len(record_data)

            if len(record_data) >= 16:
                break

        return (SelEntry(record_data), rsp.next_record_id)

    def sel_entries(self):
        """Generator which returns all SEL entries."""
        START_SEL_RECORD_ID = 0
        END_SEL_RECORD_ID = 0xffff
        if self.get_sel_entries_count() == 0:
            return

        reservation_id = self.get_sel_reservation_id()
        next_record_id = START_SEL_RECORD_ID
        while True:
            (sel_entry, next_record_id) = self.get_sel_entry(next_record_id,
                                                             reservation_id)
            yield sel_entry
            if next_record_id == END_SEL_RECORD_ID:
                break

    def get_sel_entries(self):
        """Return all SEL entries as a list."""
        return list(self.sel_entries())


class SelInfo(State):

    def _from_response(self, rsp):
        self.version = rsp.version
        self.entries = rsp.entries
        self.free_bytes = rsp.free_bytes
        self.most_recent_addition = rsp.most_recent_addition
        self.most_recent_erase = rsp.most_recent_erase
        self.operation_support = []
        if rsp.operation_support.get_sel_allocation_info:
            self.operation_support.append('get_sel_allocation_info')
        if rsp.operation_support.reserve_sel:
            self.operation_support.append('reserve_sel')
        if rsp.operation_support.partial_add_sel_entry:
            self.operation_support.append('partial_add_sel_entry')
        if rsp.operation_support.delete_sel:
            self.operation_support.append('delete_sel')
        if rsp.operation_support.overflow_flag:
            self.operation_support.append('overflow_flag')


class SelEntry(State):
    TYPE_SYSTEM_EVENT = 0x02
    TYPE_OEM_TIMESTAMPED_RANGE = list(range(0xc0, 0xe0))
    TYPE_OEM_NON_TIMESTAMPED_RANGE = list(range(0xe0, 0x100))

    def __str__(self):
        raw = '[%s]' % (' '.join(['0x%02x' % b for b in self.data]))
        string = []
        string.append('SEL Record ID 0x%04x' % self.record_id)
        string.append('  Raw: %s' % raw)
        string.append('  Type: %d' % self.type)
        string.append('  Timestamp: %d' % self.timestamp)
        string.append('  Generator: %d' % self.generator_id)
        string.append('  EvM rev: %d' % self.evm_rev)
        string.append('  Sensor Type: 0x%02x' % self.sensor_type)
        string.append('  Sensor Number: %d' % self.sensor_number)
        string.append('  Event Direction: %d' % self.event_direction)
        string.append('  Event Type: 0x%02x' % self.event_type)
        string.append('  Event Data: %s' % array('B', self.event_data).tolist())
        return "\n".join(string)

    @staticmethod
    def type_to_string(entry_type):
        string = None
        if entry_type == SelEntry.TYPE_SYSTEM_EVENT:
            string = 'System Event'
        elif entry_type in SelEntry.TYPE_OEM_TIMESTAMPED_RANGE:
            string = 'OEM timestamped (0x%02x)' % entry_type
        elif entry_type in SelEntry.TYPE_OEM_NON_TIMESTAMPED_RANGE:
            string = 'OEM non-timestamped (0x%02x)' % entry_type
        return string

    def _from_response(self, data):
        if len(data) != 16:
            raise DecodingError('Invalid SEL record length (%d)' % len(data))

        self.data = data

        # pop will change data, therefore copy it
        buffer = ByteBuffer(data)

        self.record_id = buffer.pop_unsigned_int(2)
        self.type = buffer.pop_unsigned_int(1)
        if (self.type != self.TYPE_SYSTEM_EVENT
                and self.type not in self.TYPE_OEM_TIMESTAMPED_RANGE
                and self.type not in self.TYPE_OEM_NON_TIMESTAMPED_RANGE):
            raise DecodingError('Unknown SEL type (0x%02x)' % self.type)
        self.timestamp = buffer.pop_unsigned_int(4)
        self.generator_id = buffer.pop_unsigned_int(2)
        self.evm_rev = buffer.pop_unsigned_int(1)
        self.sensor_type = buffer.pop_unsigned_int(1)
        self.sensor_number = buffer.pop_unsigned_int(1)
        event_desc = buffer.pop_unsigned_int(1)
        if event_desc & 0x80:
            self.event_direction = EVENT_DEASSERTION
        else:
            self.event_direction = EVENT_ASSERTION
        self.event_type = event_desc & 0x7f
        self.event_data = [buffer.pop_unsigned_int(1) for _ in range(3)]