hackedteam/vector-edk

View on GitHub
vector-uefi/fd/tool/chipsec/helper/oshelper.py

Summary

Maintainability
C
1 day
Test Coverage
#!/usr/local/bin/python
#CHIPSEC: Platform Security Assessment Framework
#Copyright (c) 2010-2014, Intel Corporation
# 
#This program is free software; you can redistribute it and/or
#modify it under the terms of the GNU General Public License
#as published by the Free Software Foundation; Version 2.
#
#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#GNU General Public License for more details.
#
#You should have received a copy of the GNU General Public License
#along with this program; if not, write to the Free Software
#Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
#
#Contact information:
#chipsec@intel.com
#



# -------------------------------------------------------------------------------
#
# CHIPSEC: Platform Hardware Security Assessment Framework
# (c) 2010-2012 Intel Corporation
#
# -------------------------------------------------------------------------------
#
## \addtogroup core
#@{
# __chipsec/helper/oshelper.py__ -- Abstracts support for various OS/environments, wrapper around platform
#                specific code that invokes kernel driver
#@}
#

import sys
import os
import fnmatch
import re
import errno

import chipsec.file
from chipsec.logger import *
import traceback


_importlib = True
try:
    import importlib
    
except ImportError:
    _importlib = False


ZIP_HELPER_RE = re.compile("^chipsec\/helper\/\w+\/\w+\.pyc$", re.IGNORECASE)
def f_mod_zip(x):
    return ( x.find('__init__') == -1 and ZIP_HELPER_RE.match(x) )
def map_modname_zip(x):
    return (x.rpartition('.')[0]).replace('/','.')

class OsHelperError (RuntimeError):
    def __init__(self,msg,errorcode):
        super(OsHelperError,self).__init__(msg)
        self.errorcode = errorcode
        
    
## OS Helper
#
# Abstracts support for various OS/environments, wrapper around platform specific code that invokes kernel driver
class OsHelper:
    def __init__(self):
        self.helper = None
        self.loadHelpers()
        #print "Operating System: %s %s %s %s" % (self.os_system, self.os_release, self.os_version, self.os_machine)
        #print self.os_uname
        if(not self.helper):
            import platform
            os_system  = platform.system()
            #raise OsHelperError("Unsupported platform '%s'" % os_system,errno.ENODEV)
            raise OsHelperError( "Could not load helper for '%s' environment (unsupported environment?)" % os_system, errno.ENODEV )
        else:
            self.os_system  = self.helper.os_system
            self.os_release = self.helper.os_release
            self.os_version = self.helper.os_version
            self.os_machine = self.helper.os_machine

    def loadHelpers(self):
        if chipsec.file.main_is_frozen():
            self.loadHelpersFromEXE()
        else:
            self.loadHelpersFromFileSystem()
            
    def loadHelpersFromEXE(self):
        import zipfile
        myzip = zipfile.ZipFile(os.path.join(chipsec.file.get_main_dir(),"library.zip"))
        helpers = map( map_modname_zip, filter(f_mod_zip, myzip.namelist()) )
        #print helpers
        for h in helpers:
            self.importModule(h)
            if self.helper : break
        
    def loadHelpersFromFileSystem(self):
        mydir = os.path.dirname(__file__)
        dirs = os.listdir(mydir)
        for adir in dirs:
            if self.helper :
                break
            mypath = os.path.join(mydir,adir)
            if os.path.isdir(mypath):
                for afile in os.listdir(mypath):
                    if fnmatch.fnmatch(afile, '__init__.py') or not fnmatch.fnmatch(afile, '*.py') :
                        continue
#                    print os.path.join(adir,afile)
                    mod_shortname = adir + "." + os.path.splitext(afile)[0]
                    mod_fullname = "chipsec.helper." + mod_shortname
#                    print mod_fullname
                    self.importModule(mod_fullname)
                    if self.helper : break

    def importModule(self, mod_fullname):
        try:
            if _importlib:
                module = importlib.import_module( mod_fullname )
                result = getattr( module, 'get_helper' )(  )
                self.helper = result
            # Support for older Python < 2.5
            #else:
            #    exec 'import ' + mod_fullname
            #    exec 'self.helper = ' + mod_fullname + ".get_helper()"
        except ImportError, msg:
            # @TODO: UTIL_TRACE/VERBOSE aren't correct here
            if logger().UTIL_TRACE or logger().VERBOSE:
                logger().warn( str(msg) + ' ' + mod_fullname )
                logger().log_bad( traceback.format_exc() )
            pass
        except BaseException, msg:
            logger().error( str(msg) + ' ' + mod_fullname )
            logger().log_bad( traceback.format_exc() )
            #raise OsHelperError( "Could not import OS helper %s (%s)" % (mod_fullname, str(msg)) )
            pass
            

    def __del__(self):
        try:
            destroy()
        except NameError:
            pass

    def start( self ):
        try:
            self.helper.create()
            self.helper.start()
        except (None,Exception) , msg:
            error_no = errno.ENXIO
            if hasattr(msg,'errorcode'):
                error_no = msg.errorcode
            raise OsHelperError("Could not start the OS Helper, are you running as Admin/root?\n           Message: \"%s\"" % msg,error_no)

    def stop( self ):
        self.helper.stop()

    def destroy( self ):
        self.helper.delete()

    def is_linux( self ):
        return ('linux' == self.os_system.lower())
    def is_windows( self ):
        return ('windows' == self.os_system.lower())
    def is_win8_or_greater( self ):
        win8_or_greater = self.is_windows() and ( self.os_release.startswith('8') or ('2008Server' in self.os_release) or ('2012Server' in self.os_release) )
        return win8_or_greater


    #################################################################################################
    # Actual OS helper functionality accessible to HAL components

    #
    # Read/Write PCI configuration registers via legacy CF8/CFC ports
    #
    def read_pci_reg( self, bus, device, function, address, size ):
        if ( 0 != (address & (size - 1)) ):
            logger().warn( "Config register address is not naturally aligned" )
        return self.helper.read_pci_reg( bus, device, function, address, size )

    def write_pci_reg( self, bus, device, function, address, value, size ):
        if ( 0 != (address & (size - 1)) ):
            logger().warn( "Config register address is not naturally aligned" )
        return self.helper.write_pci_reg( bus, device, function, address, value, size )

    #
    # physical_address_hi/physical_address_lo are 32 bit integers
    # 
    def read_phys_mem( self, phys_address_hi, phys_address_lo, length ):
        return self.helper.read_phys_mem( phys_address_hi, phys_address_lo, length )
    def write_phys_mem( self, phys_address_hi, phys_address_lo, length, buf ):
        return self.helper.write_phys_mem( phys_address_hi, phys_address_lo, length, buf )
    def alloc_phys_mem( self, length, max_pa_hi, max_pa_lo ):
        return self.helper.alloc_phys_mem( length, (phys_address_hi<<32|phys_address_lo) )

    #
    # physical_address is 64 bit integer
    # 
    def read_physical_mem( self, phys_address, length ):
        return self.helper.read_phys_mem( (phys_address>>32)&0xFFFFFFFF, phys_address&0xFFFFFFFF, length )
    def write_physical_mem( self, phys_address, length, buf ):
        return self.helper.write_phys_mem( (phys_address>>32)&0xFFFFFFFF, phys_address&0xFFFFFFFF, length, buf )
    def alloc_physical_mem( self, length, max_phys_address ):
        return self.helper.alloc_phys_mem( length, max_phys_address )
        #return self.helper.alloc_phys_mem( length, (max_phys_address>>32)&0xFFFFFFFF, max_phys_address&0xFFFFFFFF )


    #
    # Read/Write I/O port
    #
    def read_io_port( self, io_port, size ):
        return self.helper.read_io_port( io_port, size )
    def write_io_port( self, io_port, value, size ):
        return self.helper.write_io_port( io_port, value, size )

    #
    # Read/Write MSR on a specific CPU thread
    #
    def read_msr( self, cpu_thread_id, msr_addr ):
        return self.helper.read_msr( cpu_thread_id, msr_addr )
    def write_msr( self, cpu_thread_id, msr_addr, eax, edx ):
        return self.helper.write_msr( cpu_thread_id, msr_addr, eax, edx )

    #
    # Load CPU microcode update on a specific CPU thread
    #
    def load_ucode_update( self, cpu_thread_id, ucode_update_buf ):
        return self.helper.load_ucode_update( cpu_thread_id, ucode_update_buf )

    #
    # Read IDTR/GDTR/LDTR on a specific CPU thread
    #
    def get_descriptor_table( self, cpu_thread_id, desc_table_code ):
        return self.helper.get_descriptor_table( cpu_thread_id, desc_table_code )

    #
    # EFI Variable API
    #
    def get_EFI_variable( self, name, guid ):
        return self.helper.get_EFI_variable( name, guid )

    def set_EFI_variable( self, name, guid, var, attrs=None ):
        return self.helper.set_EFI_variable( name, guid, var, attrs )

    def list_EFI_variables( self ):
        return self.helper.list_EFI_variables()

    #
    # Xen Hypercall
    #
    def do_hypercall( self, vector, arg1=0, arg2=0, arg3=0, arg4=0, arg5=0, use_peach=0 ):
        return self.helper.do_hypercall( vector, arg1, arg2, arg3, arg4, arg5, use_peach)

    #
    # CPUID
    #
    def cpuid( self, eax ):
        return self.helper.cpuid( eax )

    def get_threads_count( self ):
        return self.helper.get_threads_count()

    def getcwd( self ):
        return self.helper.getcwd()

    #
    # Interrupts
    #
    def send_sw_smi( self, SMI_code_data, _rax, _rbx, _rcx, _rdx, _rsi, _rdi ):
        return self.helper.send_sw_smi( SMI_code_data, _rax, _rbx, _rcx, _rdx, _rsi, _rdi )


_helper  = OsHelper()
def helper():
    return _helper