tt/plugins/plugin_manager.py
import asyncio
import importlib
import pkgutil
from datetime import datetime, timezone
from asyncz.triggers import CronTrigger, IntervalTrigger
from tt.config import logger, scheduler, settings
from tt.utils.notifications import Notifier
class PluginManager:
"""
🔌 Plugins are
the core of Talky Trader,
they are loaded at startup,
to interact with the
trading platform.
Plugin Manager is used
to load, start and
dispatch message
to the plugins
Args:
plugin_directory (str): Directory
of plugins
Returns:
None
"""
def __init__(self, plugin_directory=None):
self.plugin_directory = plugin_directory or settings.plugin_directory
self.plugins = []
def load_plugins(self, plugin_names=None):
"""
🔌Load plugins from directory
Args:
plugin_names (list): List of plugin names to load
if None, load all plugins from self.plugin_directory
You can use this to minimize the load time, memory and CPU.
Returns:
None
Raises:
Exception: If there was an error loading a plugin
"""
package = importlib.import_module(self.plugin_directory)
logger.debug("Loading plugins from: {}", package)
if not plugin_names:
plugin_names = [
name for _, name, _ in pkgutil.iter_modules(package.__path__)
]
for plugin_name in plugin_names:
try:
module = importlib.import_module(
f"{self.plugin_directory}.{plugin_name}"
)
self.load_plugin(module, plugin_name)
except Exception as e:
logger.warning("Error loading plugin {}: {}", plugin_name, e)
def load_plugin(self, module, plugin_name):
"""
Load a plugin from a module
Args:
module (Module): Module
plugin_name (str): Plugin name
Returns:
None
"""
for name, obj in module.__dict__.items():
if (
isinstance(obj, type)
and issubclass(obj, BasePlugin)
and obj is not BasePlugin
):
plugin_instance = obj()
self.plugins.append(plugin_instance)
logger.debug("Plugin loaded: {}", name)
async def start_all_plugins(self):
"""
Start all plugins
Start the scheduler
Returns:
None
"""
for plugin in self.plugins:
await self.start_plugin(plugin)
scheduler.start()
async def start_plugin(self, plugin):
"""
Start a plugin
Args:
plugin (Plugin): Plugin
Returns:
None
"""
await plugin.start()
async def process_message(self, message):
"""
Send message to plugins
Args:
message (str): Message
Returns:
None
"""
# logger.debug("Processing: {}", message)
if not message:
return
tasks = []
for plugin in self.plugins:
try:
if plugin.should_handle(message):
task = asyncio.create_task(plugin.handle_message(message))
tasks.append(task)
except Exception as error:
logger.error("process {}: {}", plugin, error)
await asyncio.gather(*tasks)
class BasePlugin:
"""
âš¡ Base Plugin Class
This class is inherited by
Talky Plugins
for the scheduling,
notification and
message handling.
Scheduling is manage via asyncz lib
More info: https://github.com/tarsil/asyncz
Args:
None
Returns:
None
"""
def __init__(self):
self.enabled = False
# Bot Settings
self.bot_name = settings.bot_name
self.bot_prefix = settings.bot_prefix or "/"
self.bot_command_help = settings.bot_command_help
self.bot_command_info = settings.bot_command_info
self.bot_command_bal = settings.bot_command_bal
self.bot_command_pos = settings.bot_command_pos
self.bot_command_quote = settings.bot_command_quote
self.notifier = Notifier()
self.scheduler = scheduler
self.user_day_of_week = settings.user_day_of_week
self.user_hours = settings.user_hours
self.user_timezone = settings.user_timezone
self.bot_filter_out = settings.bot_ignore or []
self.bot_filter_in = settings.bot_filter_in or []
self.plugin_enabled = settings.plugin_enabled
self.plugin_directory = settings.plugin_directory
self.authorized_plugins = settings.authorized_plugins
self.ui_enabled = settings.ui_enabled
self.forwarder = settings.forwarder
# Trading Settings
self.trading_enabled = settings.trading_enabled
self.trading_status_message = settings.trading_status_message
self.trading_status_enabled = settings.trading_status_enabled
self.trading_status_disabled = settings.trading_status_disabled
self.trading_control = settings.trading_control
self.trading_control_message = settings.trading_control_message
self.trading_days_allowed = settings.trading_days_allowed
self.trading_hours_start = settings.trading_hours_start
self.trading_hours_end = settings.trading_hours_end
self.trading_blackout_dates = settings.trading_blackout_dates
async def start(self):
pass
async def stop(self):
pass
async def send_notification(self, message):
if self.enabled:
await self.notifier.notify(message)
def should_filter(self, message):
"""
Returns True if the plugin should NOT handle the message
if plugin is not enabled
and if ignore characters are in the message via bot_ignore
Args:
message (str): Message
Returns:
bool
"""
if not self.enabled:
return True
return any(message.startswith(word) for word in self.bot_filter_out)
def should_filter_in(self, message):
"""
Returns True if the given word is found in the message
Args:
message (str): Message
word (str): Word to search for
Returns:
bool
"""
return self.bot_filter_in in message
def should_handle(self, message):
"""
Determines if the plugin should handle
the message based on certain conditions.
Args:
message (str): The message
to be checked.
Returns:
bool: True if the plugin should
handle the message, False otherwise.
"""
if self.enabled:
return True
def is_command_to_handle(self, message):
"""
Determines if the plugin should handle
the message based on certain conditions.
Args:
message (str): The message to be checked.
Returns:
bool: True if the plugin should
handle the message, False otherwise.
"""
if message.startswith(self.bot_prefix):
return True
async def plugin_notify_schedule_task(
self, user_name=None, frequency=8, frequency_unit="hours", function=None
):
"""
Handles task notification
every X hours.
Defaulted to 8 hours
Args:
user_name (str): User name
frequency (int): Frequency
frequency_unit (str): Frequency unit
function (function): Function
Returns:
None
"""
if frequency_unit == "hours":
trigger = IntervalTrigger(hours=frequency)
elif frequency_unit == "minutes":
trigger = IntervalTrigger(minutes=frequency)
else:
raise ValueError("Invalid frequency unit. Must be 'hours' or 'minutes'.")
if function:
self.scheduler.add_task(
name=user_name,
fn=self.send_notification,
args=[f"{await function()}"],
trigger=trigger,
is_enabled=True,
)
async def plugin_notify_cron_task(
self,
user_name=None,
user_day_of_week=None,
user_hours=None,
user_timezone=None,
function=None,
):
"""
Handles task cron scheduling
for notification
default set to
Tuesday to Thursday
at 6AM, 12PM and 6PM UTC
via settings
Args:
user_name (str): User name
user_day_of_week (str): Day of week
user_hours (str): Hours
user_timezone (str): Timezone
function (function): Function
Returns:
None
"""
if not user_day_of_week:
user_day_of_week = self.user_day_of_week
if not user_hours:
user_hours = self.user_hours
if not user_timezone:
user_timezone = self.user_timezone
if function:
self.scheduler.add_task(
name=user_name,
fn=self.send_notification,
args=[f"{await function()}"],
trigger=CronTrigger(
day_of_week=user_day_of_week,
hour=user_hours,
timezone=user_timezone,
),
is_enabled=True,
)
async def handle_message(self, msg):
"""
Handles an incoming message.
Args:
msg (str): The incoming message.
Returns:
None
This is the function to use in your plugin to handle incoming messages.
"""
pass
# if not self.should_handle(msg):
# return
# command, *args = msg.split(" ")
# command = command[1:]
# command_mapping = self.get_command_mapping()
# if command in command_mapping:
# function = command_mapping[command]
# await self.send_notification(f"{await function()}")
def should_handle_timeframe(self):
"""
Returns True if the current day and time
are within the configured trading window.
Use to control trading hours for plugins
It allows to block order processing
outside of trading hours defined in settings
Returns:
bool
"""
if self.trading_control:
logger.debug("Trading control enabled")
current_time = datetime.now(timezone.utc).time()
current_day = datetime.now(timezone.utc).strftime("%a").lower()
start_time = datetime.strptime(self.trading_hours_start, "%H:%M").time()
end_time = datetime.strptime(self.trading_hours_end, "%H:%M").time()
logger.debug(
"Current time: {}, Current day: {}, Start time: {}, End time: {}",
current_time,
current_day,
start_time,
end_time,
)
control = (
current_day in self.trading_days_allowed
and start_time <= current_time <= end_time
and current_day not in self.trading_blackout_dates
)
logger.debug("Trading control: {}", control)
return control
return True