eg/WinApi/pywin32_patches/dde.py
# -*- coding: utf-8 -*-
#
# This file is part of EventGhost.
# Copyright © 2005-2020 EventGhost Project <http://www.eventghost.net/>
#
# EventGhost is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 2 of the License, or (at your option)
# any later version.
#
# EventGhost is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along
# with EventGhost. If not, see <http://www.gnu.org/licenses/>.
# This is a modified version of the dde client made by noisywiz. Original code
# can be found at https://gist.github.com/noisywiz/2661065.
# The modifications make the client work like the pywin32 dde client.
# It does not provide full functionality. More will be added as it is needed.
from ctypes import (
POINTER,
WINFUNCTYPE,
c_char_p,
c_void_p,
c_int,
c_ulong,
c_char_p
)
from ctypes.wintypes import (
BOOL,
DWORD,
BYTE,
INT,
LPCWSTR,
UINT,
ULONG
)
import time
HCONV = c_void_p
HDDEDATA = c_void_p
HSZ = c_void_p
LPBYTE = c_char_p
LPDWORD = POINTER(DWORD)
LPSTR = c_char_p
ULONG_PTR = c_ulong
# See windows/ddeml.h for declaration of struct CONVCONTEXT
PCONVCONTEXT = c_void_p
DMLERR_NO_ERROR = 0
# Predefined Clipboard Formats
CF_TEXT = 1
CF_BITMAP = 2
CF_METAFILEPICT = 3
CF_SYLK = 4
CF_DIF = 5
CF_TIFF = 6
CF_OEMTEXT = 7
CF_DIB = 8
CF_PALETTE = 9
CF_PENDATA = 10
CF_RIFF = 11
CF_WAVE = 12
CF_UNICODETEXT = 13
CF_ENHMETAFILE = 14
CF_HDROP = 15
CF_LOCALE = 16
CF_DIBV5 = 17
CF_MAX = 18
DDE_FACK = 0x8000
DDE_FBUSY = 0x4000
DDE_FDEFERUPD = 0x4000
DDE_FACKREQ = 0x8000
DDE_FRELEASE = 0x2000
DDE_FREQUESTED = 0x1000
DDE_FAPPSTATUS = 0x00FF
DDE_FNOTPROCESSED = 0x0000
DDE_FACKRESERVED = (~(DDE_FACK | DDE_FBUSY | DDE_FAPPSTATUS))
DDE_FADVRESERVED = (~(DDE_FACKREQ | DDE_FDEFERUPD))
DDE_FDATRESERVED = (~(DDE_FACKREQ | DDE_FRELEASE | DDE_FREQUESTED))
DDE_FPOKRESERVED = (~(DDE_FRELEASE))
XTYPF_NOBLOCK = 0x0002
XTYPF_NODATA = 0x0004
XTYPF_ACKREQ = 0x0008
XCLASS_MASK = 0xFC00
XCLASS_BOOL = 0x1000
XCLASS_DATA = 0x2000
XCLASS_FLAGS = 0x4000
XCLASS_NOTIFICATION = 0x8000
XTYP_ERROR = (0x0000 | XCLASS_NOTIFICATION | XTYPF_NOBLOCK)
XTYP_ADVDATA = (0x0010 | XCLASS_FLAGS)
XTYP_ADVREQ = (0x0020 | XCLASS_DATA | XTYPF_NOBLOCK)
XTYP_ADVSTART = (0x0030 | XCLASS_BOOL)
XTYP_ADVSTOP = (0x0040 | XCLASS_NOTIFICATION)
XTYP_EXECUTE = (0x0050 | XCLASS_FLAGS)
XTYP_CONNECT = (0x0060 | XCLASS_BOOL | XTYPF_NOBLOCK)
XTYP_CONNECT_CONFIRM = (0x0070 | XCLASS_NOTIFICATION | XTYPF_NOBLOCK)
XTYP_XACT_COMPLETE = (0x0080 | XCLASS_NOTIFICATION)
XTYP_POKE = (0x0090 | XCLASS_FLAGS)
XTYP_REGISTER = (0x00A0 | XCLASS_NOTIFICATION | XTYPF_NOBLOCK)
XTYP_REQUEST = (0x00B0 | XCLASS_DATA)
XTYP_DISCONNECT = (0x00C0 | XCLASS_NOTIFICATION | XTYPF_NOBLOCK)
XTYP_UNREGISTER = (0x00D0 | XCLASS_NOTIFICATION | XTYPF_NOBLOCK)
XTYP_WILDCONNECT = (0x00E0 | XCLASS_DATA | XTYPF_NOBLOCK)
XTYP_MONITOR = (0x00F0 | XCLASS_NOTIFICATION | XTYPF_NOBLOCK)
XTYP_MASK = 0x00F0
XTYP_SHIFT = 4
TIMEOUT_ASYNC = 0xFFFFFFFF
def get_winfunc(libname, funcname, restype=None, argtypes=(), _libcache={}):
"""Retrieve a function from a library, and set the data types."""
from ctypes import windll
if libname not in _libcache:
_libcache[libname] = windll.LoadLibrary(libname)
func = getattr(_libcache[libname], funcname)
func.argtypes = argtypes
func.restype = restype
return func
DDECALLBACK = WINFUNCTYPE(
HDDEDATA,
UINT,
UINT,
HCONV,
HSZ,
HSZ,
HDDEDATA,
ULONG_PTR,
ULONG_PTR
)
class DDE(object):
"""Object containing all the DDE functions"""
AccessData = get_winfunc(
"user32",
"DdeAccessData",
LPBYTE,
(HDDEDATA, LPDWORD)
)
ClientTransaction = get_winfunc(
"user32",
"DdeClientTransaction",
HDDEDATA,
(LPBYTE, DWORD, HCONV, HSZ, UINT, UINT, DWORD, LPDWORD)
)
Connect = get_winfunc(
"user32",
"DdeConnect",
HCONV,
(DWORD, HSZ, HSZ, PCONVCONTEXT)
)
CreateStringHandle = get_winfunc(
"user32",
"DdeCreateStringHandleW",
HSZ,
(DWORD, LPCWSTR, UINT)
)
Disconnect = get_winfunc(
"user32",
"DdeDisconnect",
BOOL,
(HCONV,)
)
GetLastError = get_winfunc(
"user32",
"DdeGetLastError",
UINT,
(DWORD,)
)
Initialize = get_winfunc(
"user32",
"DdeInitializeW",
UINT,
(LPDWORD, DDECALLBACK, DWORD, DWORD)
)
FreeDataHandle = get_winfunc(
"user32",
"DdeFreeDataHandle",
BOOL,
(HDDEDATA,)
)
FreeStringHandle = get_winfunc(
"user32",
"DdeFreeStringHandle",
BOOL,
(DWORD, HSZ)
)
QueryString = get_winfunc(
"user32",
"DdeQueryStringA",
DWORD,
(DWORD, HSZ, LPSTR, DWORD, c_int)
)
UnaccessData = get_winfunc("user32", "DdeUnaccessData", BOOL, (HDDEDATA,))
Uninitialize = get_winfunc("user32", "DdeUninitialize", BOOL, (DWORD,))
class DDEError(RuntimeError):
"""Exception raise when a DDE errpr occures."""
def __init__(self, msg, idInst=None):
if idInst is None:
RuntimeError.__init__(self, msg)
else:
RuntimeError.__init__(self,
"%s (err=%s)" % (msg, hex(DDE.GetLastError(idInst))))
class DDEClient(object):
"""The DDEClient class.
Use this class to create and manage a connection to a service/topic. To get
classbacks subclass DDEClient and overwrite callback."""
def __init__(self):
"""Create a connection to a service/topic."""
from ctypes import byref
self._idInst = DWORD(0)
self._hConv = HCONV()
self._callback = DDECALLBACK(self._callback)
res = DDE.Initialize(
byref(self._idInst),
self._callback,
0x00000010,
0
)
if res != DMLERR_NO_ERROR:
raise DDEError("Unable to register with DDEML (err=%s)" % hex(res))
def Create(self, *args):
pass
def ConnectTo(self, service, topic):
hszService = DDE.CreateStringHandle(self._idInst, service, 1200)
hszTopic = DDE.CreateStringHandle(self._idInst, topic, 1200)
self._hConv = DDE.Connect(
self._idInst,
hszService,
hszTopic,
PCONVCONTEXT()
)
DDE.FreeStringHandle(self._idInst, hszTopic)
DDE.FreeStringHandle(self._idInst, hszService)
def Connected(self):
return bool(self._hConv)
def Shutdown(self):
if self._hConv:
DDE.Disconnect(self._hConv)
if self._idInst:
DDE.Uninitialize(self._idInst)
def __del__(self):
"""Cleanup any active connections."""
if self._hConv:
DDE.Disconnect(self._hConv)
if self._idInst:
DDE.Uninitialize(self._idInst)
def advise(self, item, stop=False):
"""Request updates when DDE data changes."""
from ctypes import byref
hszItem = DDE.CreateStringHandle(self._idInst, item, 1200)
hDdeData = DDE.ClientTransaction(LPBYTE(), 0, self._hConv, hszItem,
CF_TEXT, XTYP_ADVSTOP if stop else XTYP_ADVSTART, TIMEOUT_ASYNC,
LPDWORD())
DDE.FreeStringHandle(self._idInst, hszItem)
if not hDdeData:
raise DDEError(
"Unable to %s advise" % ("stop" if stop else "start"),
self._idInst
)
DDE.FreeDataHandle(hDdeData)
def Exec(self, command, timeout=5000):
"""Execute a DDE command."""
pData = c_char_p(command)
cbData = DWORD(len(command) + 1)
hDdeData = DDE.ClientTransaction(
pData,
cbData,
self._hConv,
HSZ(),
CF_TEXT,
XTYP_EXECUTE,
timeout,
LPDWORD()
)
if not hDdeData:
raise DDEError("Unable to send command", self._idInst)
DDE.FreeDataHandle(hDdeData)
def request(self, item, timeout=5000):
"""Request data from DDE service."""
from ctypes import byref
hszItem = DDE.CreateStringHandle(self._idInst, item, 1200)
hDdeData = DDE.ClientTransaction(
LPBYTE(),
0,
self._hConv,
hszItem,
CF_TEXT,
XTYP_REQUEST,
timeout,
LPDWORD()
)
DDE.FreeStringHandle(self._idInst, hszItem)
if not hDdeData:
raise DDEError("Unable to request item", self._idInst)
pData = None
if timeout != TIMEOUT_ASYNC:
pdwSize = DWORD(0)
try:
pData = DDE.AccessData(hDdeData, byref(pdwSize))
except:
pData = None
if not pData:
time.sleep(0.05)
pData = self.request(item)
return pData
def callback(self, value, item=None):
"""Calback function for advice."""
print "%s: %s" % (item, value)
def _callback(
self,
wType,
uFmt,
hConv,
hsz1,
hsz2,
hDdeData,
dwData1,
dwData2
):
# if wType == XTYP_ADVDATA:
from ctypes import byref, create_string_buffer
dwSize = DWORD(0)
pData = DDE.AccessData(hDdeData, byref(dwSize))
if pData:
item = create_string_buffer('\000' * 128)
DDE.QueryString(self._idInst, hsz2, item, 128, 1004)
self.callback(pData, item.value)
DDE.UnaccessData(hDdeData)
return DDE_FACK
def WinMSGLoop():
"""Run the main windows message loop."""
from ctypes import POINTER, byref, c_ulong
from ctypes.wintypes import BOOL, HWND, MSG, UINT
LPMSG = POINTER(MSG)
LRESULT = c_ulong
GetMessage = get_winfunc(
"user32",
"GetMessageW",
BOOL,
(LPMSG, HWND, UINT, UINT)
)
TranslateMessage = get_winfunc(
"user32",
"TranslateMessage",
BOOL,
(LPMSG,)
)
# restype = LRESULT
DispatchMessage = get_winfunc(
"user32",
"DispatchMessageW",
LRESULT,
(LPMSG,)
)
msg = MSG()
lpmsg = byref(msg)
while GetMessage(lpmsg, HWND(), 0, 0) > 0:
TranslateMessage(lpmsg)
DispatchMessage(lpmsg)
def CreateServer():
return DDEClient()
def CreateConversation(server):
return server