freqtrade/rpc/rpc_manager.py
"""
This module contains class to manage RPC communications (Telegram, API, ...)
"""
import logging
from collections import deque
from typing import List
from freqtrade.constants import Config
from freqtrade.enums import NO_ECHO_MESSAGES, RPCMessageType
from freqtrade.rpc import RPC, RPCHandler
from freqtrade.rpc.rpc_types import RPCSendMsg
logger = logging.getLogger(__name__)
class RPCManager:
"""
Class to manage RPC objects (Telegram, API, ...)
"""
def __init__(self, freqtrade) -> None:
""" Initializes all enabled rpc modules """
self.registered_modules: List[RPCHandler] = []
self._rpc = RPC(freqtrade)
config = freqtrade.config
# Enable telegram
if config.get('telegram', {}).get('enabled', False):
logger.info('Enabling rpc.telegram ...')
from freqtrade.rpc.telegram import Telegram
self.registered_modules.append(Telegram(self._rpc, config))
# Enable discord
if config.get('discord', {}).get('enabled', False):
logger.info('Enabling rpc.discord ...')
from freqtrade.rpc.discord import Discord
self.registered_modules.append(Discord(self._rpc, config))
# Enable Webhook
if config.get('webhook', {}).get('enabled', False):
logger.info('Enabling rpc.webhook ...')
from freqtrade.rpc.webhook import Webhook
self.registered_modules.append(Webhook(self._rpc, config))
# Enable local rest api server for cmd line control
if config.get('api_server', {}).get('enabled', False):
logger.info('Enabling rpc.api_server')
from freqtrade.rpc.api_server import ApiServer
apiserver = ApiServer(config)
apiserver.add_rpc_handler(self._rpc)
self.registered_modules.append(apiserver)
def cleanup(self) -> None:
""" Stops all enabled rpc modules """
logger.info('Cleaning up rpc modules ...')
while self.registered_modules:
mod = self.registered_modules.pop()
logger.info('Cleaning up rpc.%s ...', mod.name)
mod.cleanup()
del mod
def send_msg(self, msg: RPCSendMsg) -> None:
"""
Send given message to all registered rpc modules.
A message consists of one or more key value pairs of strings.
e.g.:
{
'status': 'stopping bot'
}
"""
if msg.get('type') not in NO_ECHO_MESSAGES:
logger.info('Sending rpc message: %s', msg)
for mod in self.registered_modules:
logger.debug('Forwarding message to rpc.%s', mod.name)
try:
mod.send_msg(msg)
except NotImplementedError:
logger.error(f"Message type '{msg['type']}' not implemented by handler {mod.name}.")
except Exception:
logger.exception('Exception occurred within RPC module %s', mod.name)
def process_msg_queue(self, queue: deque) -> None:
"""
Process all messages in the queue.
"""
while queue:
msg = queue.popleft()
logger.info('Sending rpc strategy_msg: %s', msg)
for mod in self.registered_modules:
if mod._config.get(mod.name, {}).get('allow_custom_messages', False):
mod.send_msg({
'type': RPCMessageType.STRATEGY_MSG,
'msg': msg,
})
def startup_messages(self, config: Config, pairlist, protections) -> None:
if config['dry_run']:
self.send_msg({
'type': RPCMessageType.WARNING,
'status': 'Dry run is enabled. All trades are simulated.'
})
stake_currency = config['stake_currency']
stake_amount = config['stake_amount']
minimal_roi = config['minimal_roi']
stoploss = config['stoploss']
trailing_stop = config['trailing_stop']
timeframe = config['timeframe']
exchange_name = config['exchange']['name']
strategy_name = config.get('strategy', '')
pos_adjust_enabled = 'On' if config['position_adjustment_enable'] else 'Off'
self.send_msg({
'type': RPCMessageType.STARTUP,
'status': f'*Exchange:* `{exchange_name}`\n'
f'*Stake per trade:* `{stake_amount} {stake_currency}`\n'
f'*Minimum ROI:* `{minimal_roi}`\n'
f'*{"Trailing " if trailing_stop else ""}Stoploss:* `{stoploss}`\n'
f'*Position adjustment:* `{pos_adjust_enabled}`\n'
f'*Timeframe:* `{timeframe}`\n'
f'*Strategy:* `{strategy_name}`'
})
self.send_msg({
'type': RPCMessageType.STARTUP,
'status': f'Searching for {stake_currency} pairs to buy and sell '
f'based on {pairlist.short_desc()}'
})
if len(protections.name_list) > 0:
prots = '\n'.join([p for prot in protections.short_desc() for k, p in prot.items()])
self.send_msg({
'type': RPCMessageType.STARTUP,
'status': f'Using Protections: \n{prots}'
})