eg/Classes/Log.py
# -*- coding: utf-8 -*-
#
# This file is part of EventGhost.
# Copyright © 2005-2020 EventGhost Project <http://www.eventghost.net/>
#
# EventGhost is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 2 of the License, or (at your option)
# any later version.
#
# EventGhost is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along
# with EventGhost. If not, see <http://www.gnu.org/licenses/>.
import codecs
import sys
import wx
from collections import deque
from threading import currentThread
from time import strftime, time
from traceback import extract_tb, format_exception_only, format_stack
from types import UnicodeType
from weakref import ref
# Local imports
import eg
_oldStdOut = sys.stdout
_oldStdErr = sys.stderr
oldStdOut = codecs.lookup("ascii").streamwriter(_oldStdOut, 'backslashreplace')
oldStdErr = codecs.lookup("ascii").streamwriter(_oldStdErr, 'backslashreplace')
INFO_ICON = eg.Icons.INFO_ICON
ERROR_ICON = eg.Icons.ERROR_ICON
NOTICE_ICON = eg.Icons.NOTICE_ICON
DEBUG_ICON = eg.Icons.DEBUG_ICON
WARNING_ICON = eg.Icons.WARNING_ICON
def _build_notice(icon, args):
strs = [
str(eg.Tasklet.GetCurrentId()),
str(currentThread().getName()) + ":"
]
for arg in args:
arg = str(arg).strip()
if icon == WARNING_ICON:
arg = arg.replace('Traceback', 'Warning')
strs += [arg]
msg = ' '.join(strs)
if icon == DEBUG_ICON:
msg = 'DEBUG: ' + msg.replace('\n', '\nDEBUG: ')
elif icon == WARNING_ICON:
msg = 'WARNING: ' + msg.replace('\n', '\nWARNING: ')
std_msg = (
strftime("%H:%M:%S: ") +
msg.replace('\n', '\n' + strftime("%H:%M:%S: "))
) + '\n'
msg += "\n"
try:
oldStdErr.write(std_msg)
except:
oldStdErr.write(std_msg.decode("mbcs"))
return msg
class DummyLogCtrl(object):
def WriteLine(self, line, icon, wRef, when, indent):
#oldStdOut.write("%s\n" % line)
pass
class Log(object):
def __init__(self):
self.logListeners = []
self.eventListeners = []
self.NativeLog = True
self.buffer = ""
self.data = deque()
self.maxlength = 5000
self.ctrl = DummyLogCtrl()
log = self
class StdOut:
def write(self, data):
log.Write(data, INFO_ICON)
if eg.debugLevel:
try:
oldStdOut.write(data)
except:
oldStdOut.write(data.decode("mbcs"))
class StdErr:
def write(self, data):
log.Write(data, ERROR_ICON)
if eg.debugLevel:
try:
oldStdErr.write(data)
except:
oldStdErr.write(data.decode("mbcs"))
if eg.startupArguments.isMain:
sys.stdout = StdOut()
sys.stderr = StdErr()
if eg.debugLevel == 2:
if hasattr(_oldStdErr, "_displayMessage"):
_oldStdErr._displayMessage = False
if eg.debugLevel:
import platform
import warnings
warnings.simplefilter('error', UnicodeWarning)
self.PrintDebugNotice("----------------------------------------")
self.PrintDebugNotice(" {0} started".format(eg.APP_NAME))
self.PrintDebugNotice("----------------------------------------")
self.PrintDebugNotice(eg.APP_NAME, "Version:", eg.Version.string)
self.PrintDebugNotice("Machine type:", platform.machine())
self.PrintDebugNotice("Processor:", platform.processor())
self.PrintDebugNotice("Architecture:", platform.architecture())
self.PrintDebugNotice(
"Python:",
platform.python_branch(),
platform.python_version(),
platform.python_implementation(),
platform.python_build(),
"[{0}]".format(platform.python_compiler())
)
self.PrintDebugNotice("----------------------------------------")
# redirect all wxPython error messages to our log
class MyLog(wx.PyLog):
def DoLog(self, level, msg, dummyTimestamp):
if (level >= 6):
return
sys.stderr.write("wxError%d: %s\n" % (level, msg))
wx.Log.SetActiveTarget(MyLog())
def AddEventListener(self, listener):
if listener not in self.eventListeners:
self.eventListeners.append(listener)
def AddLogListener(self, listener):
if listener not in self.logListeners:
self.logListeners.append(listener)
@eg.LogIt
def GetData(self, numLines=-1):
if numLines == -1:
start = 0
end = len(self.data)
elif numLines > len(self.data):
end = len(self.data)
data = list(self.data)
return data[start:end]
def LogEvent(self, event):
"""
Store and display an EventGhostEvent in the logger.
"""
payload = event.payload
eventstring = event.string
if payload is not None:
if type(payload) == UnicodeType:
mesg = eventstring + ' u"' + payload + '"'
else:
mesg = eventstring + ' ' + repr(payload)
else:
mesg = eventstring
self.Write(mesg + "\n", eg.EventItem.icon, eventstring)
def NativeLogOn(self, value):
self.NativeLog = value
def Print(self, *args, **kwargs):
self._Print(args, **kwargs)
def PrintDebugNotice(self, *args):
"""
Logs a message if eg.debugLevel is set.
"""
if eg.debugLevel:
msg = _build_notice(DEBUG_ICON, args)
self.Write(msg, DEBUG_ICON)
def PrintWarningNotice(self, *args):
"""
Logs a message if eg.debugLevel is set.
"""
if eg.debugLevel:
msg = _build_notice(WARNING_ICON, args)
indent = eg.indent
eg.indent = 0
self.Write(msg, WARNING_ICON)
eg.indent = indent
def PrintError(self, *args, **kwargs):
"""
Prints an error message to the logger. The message will get a special
icon and a red colour, so the user can easily identify it as an error
message.
"""
kwargs.setdefault("icon", ERROR_ICON)
self._Print(args, **kwargs)
def PrintNotice(self, *args, **kwargs):
kwargs.setdefault("icon", NOTICE_ICON)
self._Print(args, **kwargs)
def PrintStack(self, skip=0):
strs = [
'Stack trace (most recent call last) (%s):\n' % eg.Version.string
]
strs += format_stack(sys._getframe().f_back)[skip:]
error = "".join(strs)
self.Write(error.rstrip() + "\n", ERROR_ICON)
if eg.debugLevel:
sys.stderr.write(error)
def PrintTraceback(self, msg=None, skip=0, source=None, excInfo=None):
if msg:
self.PrintError(msg, source=source)
if excInfo is None:
excInfo = sys.exc_info()
tbType, tbValue, tbTraceback = excInfo
slist = [
'Traceback (most recent call last) (%s):\n' % eg.Version.string
]
if tbTraceback:
decode = codecs.getdecoder('mbcs')
for fname, lno, funcName, text in extract_tb(tbTraceback)[skip:]:
slist.append(
u' File "%s", line %d, in %s\n' % (
decode(fname)[0], lno, funcName
)
)
if text:
slist.append(" %s\n" % text)
for line in format_exception_only(tbType, tbValue):
slist.append(decode(line)[0])
error = "".join(slist)
if source is not None:
source = ref(source)
self.Write(error.rstrip() + "\n", ERROR_ICON, source)
if eg.debugLevel:
oldStdErr.write(error)
def RemoveEventListener(self, listener):
if listener in self.eventListeners:
self.eventListeners.remove(listener)
def RemoveLogListener(self, listener):
if listener in self.logListeners:
self.logListeners.remove(listener)
def SetCtrl(self, logCtrl):
if logCtrl is not None:
self.ctrl = logCtrl
else:
self.ctrl = DummyLogCtrl()
def Write(self, text, icon, wRef=None):
try:
lines = (self.buffer + text).split("\n")
except UnicodeDecodeError:
lines = (self.buffer + text.decode("mbcs")).split("\n")
self.buffer = lines[-1]
data = self.data
when = time()
for line in lines[:-1]:
data.append((line, icon, wRef, when, eg.indent))
wx.CallAfter(self._WriteLine, line, icon, wRef, when, eg.indent)
if len(data) >= self.maxlength:
data.popleft()
def _Print(self, args, sep=" ", end="\n", icon=INFO_ICON, source=None):
if source is not None:
source = ref(source)
#strs = [unicode(arg) for arg in args]
self.Write(sep.join(args) + end, icon, source)
def _WriteLine(self, line, icon, wRef, when, indent):
if self.NativeLog:
self.ctrl.WriteLine(line, icon, wRef, when, indent)
for listener in self.logListeners:
listener.WriteLine(line, icon, wRef, when, indent)