cloudmatrix/esky

View on GitHub
esky/sudo/sudo_win32.py

Summary

Maintainability
C
7 hrs
Test Coverage
#  Copyright (c) 2009-2010, Cloud Matrix Pty. Ltd.
#  All rights reserved; available under the terms of the BSD License.
"""

  esky.sudo.sudo_win32:  win32 platform-specific functionality for esky.sudo


This module implements the esky.sudo interface using ctypes bindings to the
native win32 API.  In particular, it uses the "runas" verb technique to
launch a process with administrative rights on Windows Vista and above.

"""

import sys
import uuid
import ctypes
import ctypes.wintypes
import subprocess

from esky.sudo import sudo_base as base
import esky.slaveproc


byref = ctypes.byref
sizeof = ctypes.sizeof
kernel32 = ctypes.windll.kernel32
shell32 = ctypes.windll.shell32
advapi32 = ctypes.windll.advapi32

GENERIC_READ = -0x80000000
GENERIC_WRITE = 0x40000000
GENERIC_RDWR = GENERIC_READ | GENERIC_WRITE
OPEN_EXISTING = 3
TOKEN_QUERY = 8
SECURITY_MAX_SID_SIZE = 68
SECURITY_SQOS_PRESENT = 1048576
SECURITY_IDENTIFICATION = 65536
WinBuiltinAdministratorsSid = 26
ERROR_NO_SUCH_LOGON_SESSION = 1312
ERROR_PRIVILEGE_NOT_HELD = 1314
TokenLinkedToken = 19
SEE_MASK_NOCLOSEPROCESS = 0x00000040
SEE_MASK_NOASYNC  = 0x00000100


def _errcheck_bool(value,func,args):
    if not value:
        raise ctypes.WinError()
    return args

class SHELLEXECUTEINFO(ctypes.Structure):
    _fields_ = (
      ("cbSize",ctypes.wintypes.DWORD),
      ("fMask",ctypes.c_ulong),
      ("hwnd",ctypes.wintypes.HANDLE),
      ("lpVerb",ctypes.c_char_p),
      ("lpFile",ctypes.c_char_p),
      ("lpParameters",ctypes.c_char_p),
      ("lpDirectory",ctypes.c_char_p),
      ("nShow",ctypes.c_int),
      ("hInstApp",ctypes.wintypes.HINSTANCE),
      ("lpIDList",ctypes.c_void_p),
      ("lpClass",ctypes.c_char_p),
      ("hKeyClass",ctypes.wintypes.HKEY),
      ("dwHotKey",ctypes.wintypes.DWORD),
      ("hIconOrMonitor",ctypes.wintypes.HANDLE),
      ("hProcess",ctypes.wintypes.HANDLE),
    )


try:
    ShellExecuteEx = shell32.ShellExecuteEx
except AttributeError:
    ShellExecuteEx = None
else:
    ShellExecuteEx.restype = ctypes.wintypes.BOOL
    ShellExecuteEx.errcheck = _errcheck_bool
    ShellExecuteEx.argtypes = (
        ctypes.POINTER(SHELLEXECUTEINFO),
    )

try:
    OpenProcessToken = advapi32.OpenProcessToken
except AttributeError:
    pass
else:
    OpenProcessToken.restype = ctypes.wintypes.BOOL
    OpenProcessToken.errcheck = _errcheck_bool
    OpenProcessToken.argtypes = (
        ctypes.wintypes.HANDLE,
        ctypes.wintypes.DWORD,
        ctypes.POINTER(ctypes.wintypes.HANDLE)
    )

try:
    CreateWellKnownSid = advapi32.CreateWellKnownSid
except AttributeError:
    pass
else:
    CreateWellKnownSid.restype = ctypes.wintypes.BOOL
    CreateWellKnownSid.errcheck = _errcheck_bool
    CreateWellKnownSid.argtypes = (
        ctypes.wintypes.DWORD,
        ctypes.POINTER(ctypes.wintypes.DWORD),
        ctypes.c_void_p,
        ctypes.POINTER(ctypes.wintypes.DWORD)
    )

try:
    CheckTokenMembership = advapi32.CheckTokenMembership
except AttributeError:
    pass
else:
    CheckTokenMembership.restype = ctypes.wintypes.BOOL
    CheckTokenMembership.errcheck = _errcheck_bool
    CheckTokenMembership.argtypes = (
        ctypes.wintypes.HANDLE,
        ctypes.c_void_p,
        ctypes.POINTER(ctypes.wintypes.BOOL)
    )

try:
    GetTokenInformation = advapi32.GetTokenInformation
except AttributeError:
    pass
else:
    GetTokenInformation.restype = ctypes.wintypes.BOOL
    GetTokenInformation.errcheck = _errcheck_bool
    GetTokenInformation.argtypes = (
        ctypes.wintypes.HANDLE,
        ctypes.wintypes.DWORD,
        ctypes.c_void_p,
        ctypes.wintypes.DWORD,
        ctypes.POINTER(ctypes.wintypes.DWORD)
    )



def has_root():
    """Check whether the user currently has root access."""
    return bool(shell32.IsUserAnAdmin())


def can_get_root():
    """Check whether the user may be able to get root access."""
    #  On XP or lower this is equivalent to has_root()
    if sys.getwindowsversion()[0] < 6:
        return bool(shell32.IsUserAnAdmin())
    #  On Vista or higher, there's the whole UAC token-splitting thing.
    #  Many thanks for Junfeng Zhang for the workflow:
    #      http://blogs.msdn.com/junfeng/archive/2007/01/26/how-to-tell-if-the-current-user-is-in-administrators-group-programmatically.aspx
    proc = kernel32.GetCurrentProcess()
    #  Get the token for the current process.
    try:
        token = ctypes.wintypes.HANDLE()
        OpenProcessToken(proc,TOKEN_QUERY,byref(token))
        try:
            #  Get the administrators SID.
            sid = ctypes.create_string_buffer(SECURITY_MAX_SID_SIZE)
            sz = ctypes.wintypes.DWORD(SECURITY_MAX_SID_SIZE)
            target_sid = WinBuiltinAdministratorsSid
            CreateWellKnownSid(target_sid,None,byref(sid),byref(sz))
            #  Check whether the token has that SID directly.
            has_admin = ctypes.wintypes.BOOL()
            CheckTokenMembership(None,byref(sid),byref(has_admin))
            if has_admin.value:
                return True
            #  Get the linked token.  Failure may mean no linked token.
            lToken = ctypes.wintypes.HANDLE()
            try:
                cls = TokenLinkedToken
                GetTokenInformation(token,cls,byref(lToken),sizeof(lToken),byref(sz))
            except WindowsError, e:
                if e.winerror == ERROR_NO_SUCH_LOGON_SESSION:
                    return False
                elif e.winerror == ERROR_PRIVILEGE_NOT_HELD:
                    return False
                else:
                    raise
            #  Check if the linked token has the admin SID
            try:
                CheckTokenMembership(lToken,byref(sid),byref(has_admin))
                return bool(has_admin.value)
            finally:
                kernel32.CloseHandle(lToken)
        finally:
            kernel32.CloseHandle(token)
    finally:
        kernel32.CloseHandle(proc)



class KillablePopen(subprocess.Popen):
    """Popen that's guaranteed killable, even on python2.5."""
    if not hasattr(subprocess.Popen,"terminate"):
        def terminate(self):
            kernel32.TerminateProcess(self._handle,-1)


class FakePopen(KillablePopen):
    """Popen-alike based on a raw process handle."""
    def __init__(self,handle):
        super(FakePopen,self).__init__(None)
        self._handle = handle
    def terminate(self):
        kernel32.TerminateProcess(self._handle,-1)
    def _execute_child(self,*args,**kwds):
        pass
    

class SecureStringPipe(base.SecureStringPipe):
    """Two-way pipe for securely communicating strings with a sudo subprocess.

    This is the control pipe used for passing command data from the non-sudo
    master process to the sudo slave process.  Use read() to read the next
    string, write() to write the next string.

    On win32, this is implemented using CreateNamedPipe in the non-sudo
    master process, and connecting to the pipe from the sudo slave process.

    Security considerations to prevent hijacking of the pipe:

        * it has a strongly random name, so there can be no race condition
          before the pipe is created.
        * it has nMaxInstances set to 1 so another process cannot spoof the
          pipe while we are still alive.
        * the slave connects with pipe client impersonation disabled.

    A possible attack vector would be to wait until we spawn the slave process,
    capture the name of the pipe, then kill us and re-create the pipe to become
    the new master process.  Not sure what can be done about this, but at the
    very worst this will allow the attacker to call into the esky API with
    root privs; it *shouldn't* be sufficient to crack root on the machine...
    """

    def __init__(self,token=None,pipename=None):
        super(SecureStringPipe,self).__init__(token)
        if pipename is None:
            self.pipename = r"\\.\pipe\esky-" + uuid.uuid4().hex
            if sys.version_info[0] > 2:
                self.pipename = self.pipename.encode('utf8')
            self.pipe = kernel32.CreateNamedPipeA(
                          self.pipename,0x03,0x00,1,8192,8192,0,None
                        )
        else:
            if sys.version_info[0] < 3:
                self.pipename = pipename
            else:
                self.pipename = pipename.encode('utf8')
            self.pipe = None

    def connect(self):
        return SecureStringPipe(self.token,self.pipename)

    def _read(self,size):
        data = ctypes.create_string_buffer(size)
        szread = ctypes.c_int()
        kernel32.ReadFile(self.pipe,data,size,byref(szread),None)
        return data.raw[:szread.value]

    def _write(self,data):
        szwritten = ctypes.c_int()
        kernel32.WriteFile(self.pipe,data,len(data),byref(szwritten),None)

    def close(self):
        if self.pipe is not None:
            kernel32.CloseHandle(self.pipe)
            self.pipe = None
        super(SecureStringPipe,self).close()

    def _open(self):
        if self.pipe is None:
            self.pipe = kernel32.CreateFileA(
                self.pipename,GENERIC_RDWR,0,None,OPEN_EXISTING,
                SECURITY_SQOS_PRESENT|SECURITY_IDENTIFICATION,None
            )
        else:
            kernel32.ConnectNamedPipe(self.pipe,None)

    def _recover(self):
        kernel32.CreateFileA(
            self.pipename,GENERIC_RDWR,0,None,OPEN_EXISTING,
            SECURITY_SQOS_PRESENT|SECURITY_IDENTIFICATION,None
        )


def spawn_sudo(proxy):
    """Spawn the sudo slave process, returning proc and a pipe to message it.

    This function spawns the proxy app with administrator privileges, using
    ShellExecuteEx and the undocumented-but-widely-recommended "runas" verb.
    """
    pipe = SecureStringPipe()
    c_pipe = pipe.connect()
    if getattr(sys,"frozen",False):
        if not esky._startup_hooks_were_run:
            raise OSError(None,"unable to sudo: startup hooks not run")
        exe = [sys.executable]
    else:
        exe = [sys.executable,"-c","import esky; esky.run_startup_hooks()"]
    args = ["--esky-spawn-sudo"]
    args.append(base.b64pickle(proxy))
    args.append(base.b64pickle(c_pipe))
    # Make it a slave process so it dies if we die
    exe = exe + esky.slaveproc.get_slave_process_args() + args
    if sys.getwindowsversion()[0] < 6:
        kwds = {}
        if sys.hexversion >= 0x02060000:
            kwds["close_fds"] = True
        proc = KillablePopen(exe,**kwds)
    else:
        execinfo = SHELLEXECUTEINFO()
        execinfo.cbSize = sizeof(execinfo)
        execinfo.fMask = SEE_MASK_NOCLOSEPROCESS | SEE_MASK_NOASYNC
        execinfo.hwnd = None
        execinfo.lpVerb = b"runas"
        execinfo.lpFile = exe[0].encode('utf-8')
        execinfo.lpParameters = " ".join(exe[1:]).encode('utf-8')
        execinfo.lpDirectory = None
        execinfo.nShow = 0
        ShellExecuteEx(byref(execinfo))
        proc = FakePopen(execinfo.hProcess)
    return (proc,pipe)


def run_startup_hooks():
    if len(sys.argv) > 1 and sys.argv[1] == "--esky-spawn-sudo":
        proxy = base.b64unpickle(sys.argv[2])
        pipe = base.b64unpickle(sys.argv[3])
        proxy.run(pipe)
        sys.exit(0)