kontron/python-ipmi

View on GitHub
pyipmi/interfaces/ipmbdev.py

Summary

Maintainability
D
2 days
Test Coverage
import os
import select
import time
from array import array

from ..msgs import create_message, encode_message, decode_message
from ..errors import IpmiTimeoutError
from ..logger import log
from ..interfaces.ipmb import IpmbHeaderReq, checksum, rx_filter, encode_ipmb_msg


class IpmbDev(object):
    """This interface uses ipmb-dev-int linux driver."""

    NAME = 'ipmbdev'

    def __init__(self, slave_address=0x20, port='/dev/ipmb-0'):
        # TODO: slave address is currently not defined here
        self.slave_address = slave_address
        self.timeout = 0.25
        self.max_retries = 3
        self.next_sequence_number = 0

        self._dev = os.open(port, os.O_RDWR)

    def establish_session(self, session):
        # just remember session parameters here
        self._session = session

    def close_session(self):
        os.close(self._dev)

    def is_ipmc_accessible(self, target):
        header = IpmbHeaderReq()
        header.netfn = 6
        header.rs_lun = 0
        header.rs_sa = target.ipmb_address
        header.rq_seq = self.next_sequence_number
        header.rq_lun = 0
        header.rq_sa = self.slave_address
        header.cmdid = 1
        self._send_raw(header, None)
        self._receive_raw(header)
        return True

    def _inc_sequence_number(self):
        self.next_sequence_number = (self.next_sequence_number + 1) % 64

    @staticmethod
    def _encode_ipmb_msg_req(header, cmd_data):
        data = header.encode()
        data.extend(cmd_data)
        data.append(checksum(data[2:]))

        return data

    def _send_raw(self, header, raw_bytes):
        raw_bytes = encode_ipmb_msg(header, raw_bytes)
        i2c_addr = header.rs_sa >> 1

        log().debug('I2C TX to %02Xh [%s]', i2c_addr,
                    ' '.join(['%02x' % b for b in raw_bytes]))
        os.write(self._dev, bytes([len(raw_bytes)]) + raw_bytes)

    def _receive_raw(self, header):
        start_time = time.time()
        rsp_received = False
        poll_returned_no_data = False
        while not rsp_received:
            timeout = self.timeout - (time.time() - start_time)

            if timeout <= 0 or poll_returned_no_data:
                raise IpmiTimeoutError()

            r, w, e = select.select([self._dev], [], [], timeout)
            if self._dev not in r:
                poll_returned_no_data = True
                continue

            rx_data = os.read(self._dev, 256)
            # ipmb-dev-int puts message length into first byte
            assert rx_data[0] == len(rx_data) - 1
            rx_data = rx_data[1:]

            rx_data = array('B', rx_data)
            log().debug('I2C RX from %02Xh [%s]', rx_data[3],
                        ' '.join(['%02x' % c for c in rx_data]))

            rsp_received = rx_filter(header, rx_data)
            rx_data = rx_data[1:]

        return rx_data

    def _send_and_receive(self, target, lun, netfn, cmdid, payload):
        """Send and receive data using ipmb-dev-int interface.

        target:
        lun:
        netfn:
        cmdid:
        payload: IPMI message payload as bytestring

        Returns the received data as bytestring
        """
        self._inc_sequence_number()

        # assemble IPMB header
        header = IpmbHeaderReq()
        header.netfn = netfn
        header.rs_lun = lun
        header.rs_sa = target.ipmb_address
        header.rq_seq = self.next_sequence_number
        header.rq_lun = 0
        header.rq_sa = self.slave_address
        header.cmdid = cmdid

        retries = 0
        while retries < self.max_retries:
            try:
                self._send_raw(header, payload)
                rx_data = self._receive_raw(header)
                break
            except IpmiTimeoutError:
                pass
            except IOError:
                pass

            retries += 1
            time.sleep(retries * 0.2)

        else:
            raise IpmiTimeoutError()

        return rx_data[5:-1]

    def send_and_receive_raw(self, target, lun, netfn, raw_bytes):
        """Interface function to send and receive raw message.

        target: IPMI target
        lun: logical unit number
        netfn: network function
        raw_bytes: RAW bytes as bytestring

        Returns the IPMI message response bytestring.
        """
        return self._send_and_receive(target=target,
                                      lun=lun,
                                      netfn=netfn,
                                      cmdid=array('B', raw_bytes)[0],
                                      payload=raw_bytes[1:])

    def send_and_receive(self, req):
        """Interface function to send and receive an IPMI message.

        target: IPMI target
        req: IPMI message request

        Returns the IPMI message response.
        """
        log().debug('IPMI Request [%s]', req)

        rx_data = self._send_and_receive(target=req.target,
                                         lun=req.lun,
                                         netfn=req.netfn,
                                         cmdid=req.cmdid,
                                         payload=encode_message(req))
        rsp = create_message(req.netfn + 1, req.cmdid, req.group_extension)
        decode_message(rsp, rx_data)

        log().debug('IPMI Response [%s])', rsp)

        return rsp