kontron/python-ipmi

View on GitHub
pyipmi/interfaces/aardvark.py

Summary

Maintainability
D
2 days
Test Coverage
# Copyright (c) 2014  Kontron Europe GmbH
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA

import time
from array import array

from ..msgs import create_message, encode_message, decode_message
from ..errors import IpmiTimeoutError
from ..logger import log
from ..interfaces.ipmb import IpmbHeaderReq, checksum, rx_filter, encode_ipmb_msg
from ..utils import py3_array_tobytes

try:
    import pyaardvark
except ImportError:  # python 2
    pyaardvark = None
except RuntimeError:  # python 3
    pyaardvark = None


class Aardvark(object):
    """This interface uses an I2C USB adapter."""

    NAME = 'aardvark'

    def __init__(self, slave_address=0x20, port=0, serial_number=None,
                 enable_i2c_pullups=None, enable_target_power=None,
                 enable_fastmode=None):
        if pyaardvark is None:
            raise RuntimeError('No pyaardvark module found. You can not '
                               'use this interface.')

        self.slave_address = slave_address
        self.timeout = 0.25
        self.max_retries = 3
        self.next_sequence_number = 0

        self._dev = pyaardvark.open(port, serial_number)
        self._dev.enable_i2c_slave(self.slave_address >> 1)

        if enable_i2c_pullups:
            self.enable_pullups(enable_i2c_pullups)
        if enable_target_power:
            self.enable_target_power(enable_target_power)

        if enable_fastmode is not None:
            self.enable_fastmode(enable_fastmode)
        else:
            self.enable_fastmode(False)

    def enable_pullups(self, enabled):
        self._dev.i2c_pullups = enabled

    def enable_target_power(self, enabled):
        self._dev.target_power = enabled

    def enable_fastmode(self, enabled):
        if enabled:
            self._dev.i2c_bitrate = 400
        else:
            self._dev.i2c_bitrate = 100

    def raw_write(self, address, data):
        self._dev.i2c_master_write(address, data)

    def establish_session(self, session):
        # just remember session parameters here
        self._session = session

    def close_session(self):
        self._dev.close()

    def is_ipmc_accessible(self, target):
        header = IpmbHeaderReq()
        header.netfn = 6
        header.rs_lun = 0
        header.rs_sa = target.ipmb_address
        header.rq_seq = self.next_sequence_number
        header.rq_lun = 0
        header.rq_sa = self.slave_address
        header.cmdid = 1
        self._send_raw(header, None)
        self._receive_raw(header)
        return True

    def _inc_sequence_number(self):
        self.next_sequence_number = (self.next_sequence_number + 1) % 64

    @staticmethod
    def _encode_ipmb_msg_req(header, cmd_data):
        data = header.encode()
        data.extend(cmd_data)
        data.append(checksum(data[2:]))

        return data

    def _send_raw(self, header, raw_bytes):
        raw_bytes = encode_ipmb_msg(header, raw_bytes)
        i2c_addr = header.rs_sa >> 1

        raw_bytes = array('B', raw_bytes)
        log().debug('I2C TX to %02Xh [%s]', i2c_addr,
                    ' '.join(['%02x' % b for b in raw_bytes]))
        self._dev.i2c_master_write(i2c_addr, raw_bytes[1:])

    def _receive_raw(self, header):
        start_time = time.time()
        rsp_received = False
        poll_returned_no_data = False
        while not rsp_received:
            timeout = self.timeout - (time.time() - start_time)

            if timeout <= 0 or poll_returned_no_data:
                raise IpmiTimeoutError()

            ret = self._dev.poll(int(timeout * 1000))

            # poll returns an empty list if no event is pending
            if not ret:
                poll_returned_no_data = True
                continue

            (i2c_addr, rx_data) = self._dev.i2c_slave_read()
            rx_data = array('B', rx_data)
            log().debug('I2C RX from %02Xh [%s]', i2c_addr << 1,
                        ' '.join(['%02x' % c for c in rx_data]))

            rq_sa = array('B', [i2c_addr << 1, ])
            rsp_received = rx_filter(header, rq_sa + rx_data)

        return rx_data

    def _send_and_receive(self, target, lun, netfn, cmdid, payload):
        """Send and receive data using aardvark interface.

        target:
        lun:
        netfn:
        cmdid:
        payload: IPMI message payload as bytestring

        Returns the received data as bytestring
        """
        self._inc_sequence_number()

        # assemble IPMB header
        header = IpmbHeaderReq()
        header.netfn = netfn
        header.rs_lun = lun
        header.rs_sa = target.ipmb_address
        header.rq_seq = self.next_sequence_number
        header.rq_lun = 0
        header.rq_sa = self.slave_address
        header.cmdid = cmdid

        retries = 0
        while retries < self.max_retries:
            try:
                self._send_raw(header, payload)
                rx_data = self._receive_raw(header)
                break
            except IpmiTimeoutError:
                pass
            except IOError:
                pass

            retries += 1
            time.sleep(retries*0.2)

        else:
            raise IpmiTimeoutError()

        return py3_array_tobytes(rx_data)[5:-1]

    def send_and_receive_raw(self, target, lun, netfn, raw_bytes):
        """Interface function to send and receive raw message.

        target: IPMI target
        lun: logical unit number
        netfn: network function
        raw_bytes: RAW bytes as bytestring

        Returns the IPMI message response bytestring.
        """
        return self._send_and_receive(target=target,
                                      lun=lun,
                                      netfn=netfn,
                                      cmdid=array('B', raw_bytes)[0],
                                      payload=raw_bytes[1:])

    def send_and_receive(self, req):
        """Interface function to send and receive an IPMI message.

        target: IPMI target
        req: IPMI message request

        Returns the IPMI message response.
        """
        log().debug('IPMI Request [%s]', req)

        rx_data = self._send_and_receive(target=req.target,
                                         lun=req.lun,
                                         netfn=req.netfn,
                                         cmdid=req.cmdid,
                                         payload=encode_message(req))
        rsp = create_message(req.netfn + 1, req.cmdid, req.group_extension)
        decode_message(rsp, rx_data)

        log().debug('IPMI Response [%s])', rsp)

        return rsp