kontron/python-ipmi

View on GitHub
pyipmi/interfaces/ipmitool.py

Summary

Maintainability
B
4 hrs
Test Coverage
# Copyright (c) 2014  Kontron Europe GmbH
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA


import re

from subprocess import Popen, PIPE
from array import array

from ..session import Session
from ..errors import IpmiTimeoutError, IpmiConnectionError
from ..logger import log
from ..msgs import encode_message, decode_message, create_message
from ..msgs.constants import CC_OK
from ..utils import py3dec_unic_bytes_fix, ByteBuffer, py3_array_tobytes


class Ipmitool(object):
    """This interface uses the ipmitool raw command.

    This "emulates" a RMCP session by using raw commands.

    It uses the session information to assemble the correct ipmitool
    parameters. Therefore, a session has to be established before any request
    can be sent.
    """

    NAME = 'ipmitool'
    IPMITOOL_PATH = 'ipmitool'
    supported_interfaces = ['lan', 'lanplus', 'serial-terminal', 'open']

    def __init__(self, interface_type='lan'):
        if interface_type in self.supported_interfaces:
            self._interface_type = interface_type
        else:
            raise RuntimeError('interface type %s not supported' %
                               interface_type)

        self.re_completion_code = re.compile(
                r"Unable to send RAW command \(.*rsp=(0x[0-9a-f]+)\)")
        self.re_timeout = re.compile(
                r"Unable to send RAW command \(.*cmd=0x[0-9a-f]+\)")
        self.re_unable_establish = re.compile(
                r".*Unable to establish.*")
        self.re_could_not_open = re.compile(
                r".*Could not open device.*")

        self._session = None

    def establish_session(self, session):
        # just remember session parameters here
        self._session = session

    def rmcp_ping(self):

        if self._interface_type == 'serial-terminal':
            raise RuntimeError(
                'rcmp_ping not supported on "serial-terminal" interface')

        # for now this uses impitool..
        cmd = self.IPMITOOL_PATH
        cmd += (' -I %s' % self._interface_type)
        cmd += (' -H %s' % self._session.rmcp_host)
        cmd += (' -p %s' % self._session.rmcp_port)
        if self._session.auth_type == Session.AUTH_TYPE_NONE:
            cmd += (' -A NONE')
        elif self._session.auth_type == Session.AUTH_TYPE_PASSWORD:
            cmd += (' -U "%s"' % self._session.auth_username)
            cmd += (' -P "%s"' % self._session.auth_password)
        cmd += (' session info all')

        _, rc = self._run_ipmitool(cmd)
        if rc:
            raise IpmiTimeoutError()

    def is_ipmc_accessible(self, target):
        try:
            self.rmcp_ping()
            accessible = True
        except IpmiTimeoutError:
            accessible = False

        return accessible

    def _parse_output(self, output):
        cc, rsp = None, None
        hexstr = ''

        for line in py3dec_unic_bytes_fix(output).split('\n'):
            # Don't try to parse ipmitool error messages
            if 'failed' in line:
                continue

            # Check for timeout
            if self.re_timeout.match(line):
                raise IpmiTimeoutError()

            # Check for unable to establish session
            if self.re_unable_establish.match(line):
                raise IpmiConnectionError('ipmitool: {}'.format(line))

            # Check for completion code
            match_completion_code = self.re_completion_code.match(line)
            if match_completion_code:
                cc = int(match_completion_code.group(1), 16)
                break

            # Check for error opening ipmi device
            if self.re_could_not_open.match(line):
                raise RuntimeError('ipmitool failed: {}'.format(output))

            hexstr += line.replace('\r', '').strip() + ' '

        hexstr = hexstr.strip()
        if len(hexstr):
            rsp = array('B', [
                int(value, 16) for value in hexstr.split(' ')
            ])

        return cc, rsp

    def send_and_receive_raw(self, target, lun, netfn, raw_bytes):
        if self._interface_type in ['lan', 'lanplus']:
            cmd = self._build_ipmitool_cmd(target, lun, netfn, raw_bytes)
        elif self._interface_type in ['open']:
            cmd = self._build_open_ipmitool_cmd(target, lun, netfn, raw_bytes)
        elif self._interface_type in ['serial-terminal']:
            cmd = self._build_serial_ipmitool_cmd(target, lun, netfn,
                                                  raw_bytes)
        else:
            raise RuntimeError('interface type %s not supported' %
                               self._interface_type)

        output, rc = self._run_ipmitool(cmd)
        cc, rsp = self._parse_output(output)

        data = array('B')

        if cc is not None:
            data.append(cc)
        else:
            if rc != 0:
                raise RuntimeError('ipmitool failed with rc=%d' % rc)
            # completion code
            data.append(CC_OK)
            if rsp:
                data.extend(rsp)

        log().debug('IPMI RX: {:s}'.format(
            ''.join('%02x ' % b for b in array('B', data))))

        return py3_array_tobytes(data)

    def send_and_receive(self, req):
        log().debug('IPMI Request [%s]', req)

        req_data = ByteBuffer((req.cmdid,))
        req_data.push_string(encode_message(req))

        rsp_data = self.send_and_receive_raw(req.target, req.lun, req.netfn,
                                             py3_array_tobytes(req_data))

        rsp = create_message(req.netfn + 1, req.cmdid, req.group_extension)
        decode_message(rsp, rsp_data)
        log().debug('IPMI Response [%s])', rsp)

        return rsp

    @staticmethod
    def _build_ipmitool_raw_data(lun, netfn, raw):
        cmd = ' -l {:d} raw '.format(lun)
        cmd += ' '.join(['0x%02x' % (d)
                         for d in [netfn] + array('B', raw).tolist()])
        return cmd

    @staticmethod
    def _build_ipmitool_target(target):
        cmd = ''
        if target is None:
            return ''
        if target.routing is not None:
            # we have to do bridging here
            if len(target.routing) == 1:
                pass
            if len(target.routing) == 2:
                # ipmitool/shelfmanager does implicit bridging
                cmd += (' -t 0x%02x' % target.routing[1].rs_sa)
                cmd += (' -b %d' % target.routing[0].channel)
            elif len(target.routing) == 3:
                cmd += (' -T 0x%02x' % target.routing[1].rs_sa)
                cmd += (' -B %d' % target.routing[0].channel)
                cmd += (' -t 0x%02x' % target.routing[2].rs_sa)
                cmd += (' -b %d' % target.routing[1].channel)
            else:
                raise RuntimeError('The impitool interface at most double '
                                   'briding %s' % target)

        elif target.ipmb_address:
            cmd += (' -t 0x%02x' % target.ipmb_address)

        return cmd

    def _build_ipmitool_priv_level(self, level):
        LEVELS = {
                   Session.PRIV_LEVEL_USER: 'USER',
                   Session.PRIV_LEVEL_OPERATOR: 'OPERATOR',
                   Session.PRIV_LEVEL_ADMINISTRATOR: 'ADMINISTRATOR'
                 }

        return (' -L %s' % LEVELS[level])

    def _build_ipmitool_cmd(self, target, lun, netfn, raw_bytes):
        if not hasattr(self, '_session'):
            raise RuntimeError('Session needs to be set')

        cmd = self.IPMITOOL_PATH
        cmd += (' -I %s' % self._interface_type)
        cmd += (' -H %s' % self._session.rmcp_host)
        cmd += (' -p %s' % self._session.rmcp_port)

        cmd += self._build_ipmitool_priv_level(self._session.priv_level)

        if self._session.auth_type == Session.AUTH_TYPE_NONE:
            cmd += ' -P ""'
        elif self._session.auth_type == Session.AUTH_TYPE_PASSWORD:
            cmd += (' -U "%s"' % self._session.auth_username)
            cmd += (' -P "%s"' % self._session.auth_password)
        else:
            raise RuntimeError('Session type %d not supported' %
                               self._session.auth_type)

        cmd += self._build_ipmitool_target(target)
        cmd += self._build_ipmitool_raw_data(lun, netfn, raw_bytes)
        cmd += (' 2>&1')

        return cmd

    def _build_serial_ipmitool_cmd(self, target, lun, netfn, raw_bytes):
        if not hasattr(self, '_session'):
            raise RuntimeError('Session needs to be set')

        cmd = '{path!s:s} -I {interface!s:s} -D {port!s:s}:{baud!s:s}'\
            .format(
                path=self.IPMITOOL_PATH,
                interface=self._interface_type,
                port=self._session.serial_port,
                baud=self._session.serial_baudrate
            )

        cmd += self._build_ipmitool_target(target)
        cmd += self._build_ipmitool_raw_data(lun, netfn, raw_bytes)

        return cmd

    def _build_open_ipmitool_cmd(self, target, lun, netfn, raw_bytes):
        if not hasattr(self, '_session'):
            raise RuntimeError('Session needs to be set')

        cmd = self.IPMITOOL_PATH
        cmd += (' -I %s' % self._interface_type)

        cmd += self._build_ipmitool_target(target)
        cmd += self._build_ipmitool_raw_data(lun, netfn, raw_bytes)
        cmd += (' 2>&1')

        return cmd

    @staticmethod
    def _run_ipmitool(cmd):
        """Legacy call of ipmitool (will be removed in future)."""
        log().debug('Running ipmitool "%s"', cmd)

        child = Popen(cmd, shell=True, stdout=PIPE)
        output = child.communicate()[0]

        log().debug('return with rc=%d, output was:\n%s',
                    child.returncode,
                    output)

        if child.returncode == 127:
            raise RuntimeError('ipmitool command not found')

        return output, child.returncode