EventGhost/EventGhost

View on GitHub
eg/WinApi/pywin32_patches/dde.py

Summary

Maintainability
B
5 hrs
Test Coverage
# -*- coding: utf-8 -*-
#
# This file is part of EventGhost.
# Copyright © 2005-2020 EventGhost Project <http://www.eventghost.net/>
#
# EventGhost is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 2 of the License, or (at your option)
# any later version.
#
# EventGhost is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along
# with EventGhost. If not, see <http://www.gnu.org/licenses/>.

# This is a modified version of the dde client made by noisywiz. Original code
# can be found at https://gist.github.com/noisywiz/2661065.
# The modifications make the client work like the pywin32 dde client.
# It does not provide full functionality. More will be added as it is needed.

from ctypes import (
    POINTER,
    WINFUNCTYPE,
    c_char_p,
    c_void_p,
    c_int,
    c_ulong,
    c_char_p
)
from ctypes.wintypes import (
    BOOL,
    DWORD,
    BYTE,
    INT,
    LPCWSTR,
    UINT,
    ULONG
)
import time


HCONV = c_void_p
HDDEDATA = c_void_p
HSZ = c_void_p
LPBYTE = c_char_p
LPDWORD = POINTER(DWORD)
LPSTR = c_char_p
ULONG_PTR = c_ulong

# See windows/ddeml.h for declaration of struct CONVCONTEXT
PCONVCONTEXT = c_void_p

DMLERR_NO_ERROR = 0

# Predefined Clipboard Formats
CF_TEXT = 1
CF_BITMAP = 2
CF_METAFILEPICT = 3
CF_SYLK = 4
CF_DIF = 5
CF_TIFF = 6
CF_OEMTEXT = 7
CF_DIB = 8
CF_PALETTE = 9
CF_PENDATA = 10
CF_RIFF = 11
CF_WAVE = 12
CF_UNICODETEXT = 13
CF_ENHMETAFILE = 14
CF_HDROP = 15
CF_LOCALE = 16
CF_DIBV5 = 17
CF_MAX = 18

DDE_FACK = 0x8000
DDE_FBUSY = 0x4000
DDE_FDEFERUPD = 0x4000
DDE_FACKREQ = 0x8000
DDE_FRELEASE = 0x2000
DDE_FREQUESTED = 0x1000
DDE_FAPPSTATUS = 0x00FF
DDE_FNOTPROCESSED = 0x0000

DDE_FACKRESERVED = (~(DDE_FACK | DDE_FBUSY | DDE_FAPPSTATUS))
DDE_FADVRESERVED = (~(DDE_FACKREQ | DDE_FDEFERUPD))
DDE_FDATRESERVED = (~(DDE_FACKREQ | DDE_FRELEASE | DDE_FREQUESTED))
DDE_FPOKRESERVED = (~(DDE_FRELEASE))

XTYPF_NOBLOCK = 0x0002
XTYPF_NODATA = 0x0004
XTYPF_ACKREQ = 0x0008

XCLASS_MASK = 0xFC00
XCLASS_BOOL = 0x1000
XCLASS_DATA = 0x2000
XCLASS_FLAGS = 0x4000
XCLASS_NOTIFICATION = 0x8000

XTYP_ERROR = (0x0000 | XCLASS_NOTIFICATION | XTYPF_NOBLOCK)
XTYP_ADVDATA = (0x0010 | XCLASS_FLAGS)
XTYP_ADVREQ = (0x0020 | XCLASS_DATA | XTYPF_NOBLOCK)
XTYP_ADVSTART = (0x0030 | XCLASS_BOOL)
XTYP_ADVSTOP = (0x0040 | XCLASS_NOTIFICATION)
XTYP_EXECUTE = (0x0050 | XCLASS_FLAGS)
XTYP_CONNECT = (0x0060 | XCLASS_BOOL | XTYPF_NOBLOCK)
XTYP_CONNECT_CONFIRM = (0x0070 | XCLASS_NOTIFICATION | XTYPF_NOBLOCK)
XTYP_XACT_COMPLETE = (0x0080 | XCLASS_NOTIFICATION)
XTYP_POKE = (0x0090 | XCLASS_FLAGS)
XTYP_REGISTER = (0x00A0 | XCLASS_NOTIFICATION | XTYPF_NOBLOCK)
XTYP_REQUEST = (0x00B0 | XCLASS_DATA)
XTYP_DISCONNECT = (0x00C0 | XCLASS_NOTIFICATION | XTYPF_NOBLOCK)
XTYP_UNREGISTER = (0x00D0 | XCLASS_NOTIFICATION | XTYPF_NOBLOCK)
XTYP_WILDCONNECT = (0x00E0 | XCLASS_DATA | XTYPF_NOBLOCK)
XTYP_MONITOR = (0x00F0 | XCLASS_NOTIFICATION | XTYPF_NOBLOCK)

XTYP_MASK = 0x00F0
XTYP_SHIFT = 4

TIMEOUT_ASYNC = 0xFFFFFFFF


def get_winfunc(libname, funcname, restype=None, argtypes=(), _libcache={}):
    """Retrieve a function from a library, and set the data types."""
    from ctypes import windll

    if libname not in _libcache:
        _libcache[libname] = windll.LoadLibrary(libname)
    func = getattr(_libcache[libname], funcname)
    func.argtypes = argtypes
    func.restype = restype

    return func


DDECALLBACK = WINFUNCTYPE(
    HDDEDATA,
    UINT,
    UINT,
    HCONV,
    HSZ,
    HSZ,
    HDDEDATA,
    ULONG_PTR,
    ULONG_PTR
)


class DDE(object):
    """Object containing all the DDE functions"""
    AccessData = get_winfunc(
        "user32",
        "DdeAccessData",
        LPBYTE,
        (HDDEDATA, LPDWORD)
    )
    ClientTransaction = get_winfunc(
        "user32",
        "DdeClientTransaction",
        HDDEDATA,
        (LPBYTE, DWORD, HCONV, HSZ, UINT, UINT, DWORD, LPDWORD)
    )
    Connect = get_winfunc(
        "user32",
        "DdeConnect",
        HCONV,
        (DWORD, HSZ, HSZ, PCONVCONTEXT)
    )
    CreateStringHandle = get_winfunc(
        "user32",
        "DdeCreateStringHandleW",
        HSZ,
        (DWORD, LPCWSTR, UINT)
    )
    Disconnect = get_winfunc(
        "user32",
        "DdeDisconnect",
        BOOL,
        (HCONV,)
    )
    GetLastError = get_winfunc(
        "user32",
        "DdeGetLastError",
        UINT,
        (DWORD,)
    )
    Initialize = get_winfunc(
        "user32",
        "DdeInitializeW",
        UINT,
        (LPDWORD, DDECALLBACK, DWORD, DWORD)
    )
    FreeDataHandle = get_winfunc(
        "user32",
        "DdeFreeDataHandle",
        BOOL,
        (HDDEDATA,)
    )
    FreeStringHandle = get_winfunc(
        "user32",
        "DdeFreeStringHandle",
        BOOL,
        (DWORD, HSZ)
    )
    QueryString = get_winfunc(
        "user32",
        "DdeQueryStringA",
        DWORD,
        (DWORD, HSZ, LPSTR, DWORD, c_int)
    )
    UnaccessData = get_winfunc("user32", "DdeUnaccessData", BOOL, (HDDEDATA,))
    Uninitialize = get_winfunc("user32", "DdeUninitialize", BOOL, (DWORD,))


class DDEError(RuntimeError):
    """Exception raise when a DDE errpr occures."""

    def __init__(self, msg, idInst=None):
        if idInst is None:
            RuntimeError.__init__(self, msg)
        else:
            RuntimeError.__init__(self,
                "%s (err=%s)" % (msg, hex(DDE.GetLastError(idInst))))


class DDEClient(object):
    """The DDEClient class.

    Use this class to create and manage a connection to a service/topic.  To get
    classbacks subclass DDEClient and overwrite callback."""

    def __init__(self):
        """Create a connection to a service/topic."""
        from ctypes import byref

        self._idInst = DWORD(0)
        self._hConv = HCONV()

        self._callback = DDECALLBACK(self._callback)
        res = DDE.Initialize(
            byref(self._idInst),
            self._callback,
            0x00000010,
            0
        )
        if res != DMLERR_NO_ERROR:
            raise DDEError("Unable to register with DDEML (err=%s)" % hex(res))

    def Create(self, *args):
        pass

    def ConnectTo(self, service, topic):
        hszService = DDE.CreateStringHandle(self._idInst, service, 1200)
        hszTopic = DDE.CreateStringHandle(self._idInst, topic, 1200)
        self._hConv = DDE.Connect(
            self._idInst,
            hszService,
            hszTopic,
            PCONVCONTEXT()
        )
        DDE.FreeStringHandle(self._idInst, hszTopic)
        DDE.FreeStringHandle(self._idInst, hszService)

    def Connected(self):
        return bool(self._hConv)

    def Shutdown(self):
        if self._hConv:
            DDE.Disconnect(self._hConv)
        if self._idInst:
            DDE.Uninitialize(self._idInst)

    def __del__(self):
        """Cleanup any active connections."""
        if self._hConv:
            DDE.Disconnect(self._hConv)
        if self._idInst:
            DDE.Uninitialize(self._idInst)

    def advise(self, item, stop=False):
        """Request updates when DDE data changes."""
        from ctypes import byref

        hszItem = DDE.CreateStringHandle(self._idInst, item, 1200)
        hDdeData = DDE.ClientTransaction(LPBYTE(), 0, self._hConv, hszItem,
            CF_TEXT, XTYP_ADVSTOP if stop else XTYP_ADVSTART, TIMEOUT_ASYNC,
            LPDWORD())
        DDE.FreeStringHandle(self._idInst, hszItem)
        if not hDdeData:
            raise DDEError(
                "Unable to %s advise" % ("stop" if stop else "start"),
                self._idInst
            )
        DDE.FreeDataHandle(hDdeData)

    def Exec(self, command, timeout=5000):
        """Execute a DDE command."""
        pData = c_char_p(command)
        cbData = DWORD(len(command) + 1)
        hDdeData = DDE.ClientTransaction(
            pData,
            cbData,
            self._hConv,
            HSZ(),
            CF_TEXT,
            XTYP_EXECUTE,
            timeout,
            LPDWORD()
        )
        if not hDdeData:
            raise DDEError("Unable to send command", self._idInst)
        DDE.FreeDataHandle(hDdeData)

    def request(self, item, timeout=5000):
        """Request data from DDE service."""
        from ctypes import byref

        hszItem = DDE.CreateStringHandle(self._idInst, item, 1200)
        hDdeData = DDE.ClientTransaction(
            LPBYTE(),
            0,
            self._hConv,
            hszItem,
            CF_TEXT,
            XTYP_REQUEST,
            timeout,
            LPDWORD()
        )
        DDE.FreeStringHandle(self._idInst, hszItem)
        if not hDdeData:
            raise DDEError("Unable to request item", self._idInst)

        pData = None
        if timeout != TIMEOUT_ASYNC:
            pdwSize = DWORD(0)
            try:
                pData = DDE.AccessData(hDdeData, byref(pdwSize))
            except:
                pData = None

        if not pData:
            time.sleep(0.05)
            pData = self.request(item)
        return pData

    def callback(self, value, item=None):
        """Calback function for advice."""
        print "%s: %s" % (item, value)

    def _callback(
        self,
        wType,
        uFmt,
        hConv,
        hsz1,
        hsz2,
        hDdeData,
        dwData1,
        dwData2
    ):
        # if wType == XTYP_ADVDATA:
        from ctypes import byref, create_string_buffer

        dwSize = DWORD(0)
        pData = DDE.AccessData(hDdeData, byref(dwSize))
        if pData:
            item = create_string_buffer('\000' * 128)
            DDE.QueryString(self._idInst, hsz2, item, 128, 1004)
            self.callback(pData, item.value)
            DDE.UnaccessData(hDdeData)
        return DDE_FACK


def WinMSGLoop():
    """Run the main windows message loop."""
    from ctypes import POINTER, byref, c_ulong
    from ctypes.wintypes import BOOL, HWND, MSG, UINT

    LPMSG = POINTER(MSG)
    LRESULT = c_ulong
    GetMessage = get_winfunc(
        "user32",
        "GetMessageW",
        BOOL,
        (LPMSG, HWND, UINT, UINT)
    )
    TranslateMessage = get_winfunc(
        "user32",
        "TranslateMessage",
        BOOL,
        (LPMSG,)
    )
    # restype = LRESULT
    DispatchMessage = get_winfunc(
        "user32",
        "DispatchMessageW",
        LRESULT,
        (LPMSG,)
    )

    msg = MSG()
    lpmsg = byref(msg)
    while GetMessage(lpmsg, HWND(), 0, 0) > 0:
        TranslateMessage(lpmsg)
        DispatchMessage(lpmsg)


def CreateServer():
    return DDEClient()


def CreateConversation(server):
    return server