miniirc.py
#!/usr/bin/python3
#
# miniirc - A small-ish IRC framework.
#
# © 2018-2022 by luk3yx and other contributors of miniirc.
#
import atexit, threading, time, select, socket, ssl, sys, warnings
# The version string and tuple
ver = __version_info__ = (1, 9, 1)
version = 'miniirc IRC framework v1.9.1'
__version__ = '1.9.1'
# __all__ and _default_caps
__all__ = ['CmdHandler', 'Handler', 'IRC']
_default_caps = {'account-tag', 'away-notify', 'cap-notify', 'chghost',
'draft/message-tags-0.2', 'invite-notify', 'message-tags',
'oragono.io/maxline-2', 'server-time', 'sts'}
# Get the certificate list.
try:
from certifi import where as get_ca_certs
except ImportError:
def get_ca_certs():
pass
# Create global handlers
_global_handlers = {}
_colon_warning = False
def _add_handler(handlers, events, ircv3, cmd_arg, colon):
if (colon and _colon_warning and
not all(str(e).upper().startswith('IRCV3 ') for e in events)):
warnings.warn('Using colon=True or not specifying the colon '
'keyword argument to miniirc.Handler is deprecated.',
DeprecationWarning, stacklevel=3)
if not events:
if not cmd_arg:
raise TypeError('Handler() called without arguments.')
events = (None,)
def add_handler(func):
for event in events:
if event is not None:
event = str(event).upper()
if event not in handlers:
handlers[event] = []
if func not in handlers[event]:
handlers[event].append(func)
f = getattr(func, '__func__', func)
if ircv3:
f.miniirc_ircv3 = True
if cmd_arg:
f.miniirc_cmd_arg = True
if colon:
f.miniirc_colon = True
return func
return add_handler
def Handler(*events, ircv3=False, colon=True):
return _add_handler(_global_handlers, events, ircv3, False, colon)
def CmdHandler(*events, ircv3=False, colon=True):
return _add_handler(_global_handlers, events, ircv3, True, colon)
# Parse IRCv3 tags
ircv3_tag_escapes = {':': ';', 's': ' ', 'r': '\r', 'n': '\n'}
def _tags_to_dict(tag_list, separator=';'):
tags = {}
if separator:
tag_list = tag_list.split(separator)
for tag in tag_list:
tag = tag.split('=', 1)
if len(tag) == 1:
tags[tag[0]] = True
elif len(tag) == 2:
if '\\' in tag[1]: # Iteration is bad, only do it if required.
value = ''
escape = False
for char in tag[1]: # TODO: Remove this iteration.
if escape:
value += ircv3_tag_escapes.get(char, char)
escape = False
elif char == '\\':
escape = True
else:
value += char
else:
value = tag[1] or True
tags[tag[0]] = value
return tags
# Create the IRCv2/3 parser
def ircv3_message_parser(msg):
n = msg.split(' ')
# Process IRCv3 tags
if n[0].startswith('@'):
tags = _tags_to_dict(n.pop(0)[1:])
else:
tags = {}
# Process arguments
if n[0].startswith(':'):
while len(n) < 2:
n.append('')
hostmask = n[0][1:].split('!', 1)
if len(hostmask) < 2:
hostmask.append(hostmask[0])
i = hostmask[1].split('@', 1)
if len(i) < 2:
i.append(i[0])
hostmask = (hostmask[0], i[0], i[1])
cmd = n[1]
else:
cmd = n[0]
hostmask = (cmd, cmd, cmd)
n.insert(0, '')
# Get the command and arguments
args = []
c = 1
for i in n[2:]:
c += 1
if i.startswith(':'):
args.append(' '.join(n[c:]))
break
else:
args.append(i)
# Return the parsed data
return cmd, hostmask, tags, args
# Escape tags
def _escape_tag(tag):
tag = str(tag).replace('\\', '\\\\')
for i in ircv3_tag_escapes:
tag = tag.replace(ircv3_tag_escapes[i], '\\' + i)
return tag
# Convert a dict into an IRCv3 tags string
def _dict_to_tags(tags):
res = b'@'
for tag in tags:
if tags[tag]:
etag = _escape_tag(tag).replace('=', '-')
if isinstance(tags[tag], str):
etag += '=' + _escape_tag(tags[tag])
etag = (etag + ';').encode('utf-8')
if len(res) + len(etag) > 4094:
break
res += etag
if len(res) < 3:
return b''
return res[:-1] + b' '
# A wrapper for callable logfiles
class _Logfile:
__slots__ = ('_buffer', '_func', '_lock')
def write(self, data):
with self._lock:
self._buffer += data
while '\n' in self._buffer:
line, self._buffer = self._buffer.split('\n', 1)
self._func(line)
def __init__(self, func):
self._buffer = ''
self._func = func
self._lock = threading.Lock()
# Replace invalid RFC1459 characters with Unicode lookalikes
def _prune_arg(arg):
if arg.startswith(':'):
arg = '\u0703' + arg[1:]
return arg.replace(' ', '\xa0').replace('\r', '\xa0').replace('\n', '\xa0')
# Create the IRC class
class IRC:
connected = None
debug_file = sys.stdout
sendq = None
msglen = 512
_main_thread = None
_sasl = False
_unhandled_caps = None
# This will no longer be an alias in miniirc v2.0.0.
# This is still a property to avoid breaking miniirc_matrix
current_nick = property(lambda self: self._current_nick)
# For backwards compatibility, irc.nick will return the current nickname.
# However, changing irc.nick will change the desired nickname as well
# TODO: Consider changing what irc.nick does if it won't break anything or
# making desired_nick public
@current_nick.setter
def nick(self, new_nick):
self._desired_nick = new_nick
self._current_nick = new_nick
def __init__(self, ip, port, nick, channels=None, *, ssl=None, ident=None,
realname=None, persist=True, debug=False, ns_identity=None,
auto_connect=True, ircv3_caps=None, connect_modes=None,
quit_message='I grew sick and died.', ping_interval=60,
ping_timeout=None, verify_ssl=True, executor=None):
# Set basic variables
self.ip = ip
self.port = int(port)
self.nick = nick
if isinstance(channels, str):
channels = map(str.lstrip, channels.split(','))
self.channels = set(channels or ())
self.ident = ident or nick
self.realname = realname or nick
self.ssl = ssl
self.persist = persist
self.ircv3_caps = set(ircv3_caps or ()) | _default_caps
self.active_caps = set()
self.isupport = {}
self.connect_modes = connect_modes
self.quit_message = quit_message
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.verify_ssl = verify_ssl
self._keepnick_active = False
self._executor = executor
# Set the NickServ identity
if not ns_identity or isinstance(ns_identity, str):
self.ns_identity = ns_identity
else:
self.ns_identity = ' '.join(ns_identity)
# Set the debug file
if not debug:
self.debug_file = None
elif hasattr(debug, 'write'):
self.debug_file = debug
elif hasattr(debug, '__call__'):
self.debug_file = _Logfile(debug)
# Add IRCv3 capabilities.
if self.ns_identity:
self.ircv3_caps.add('sasl')
# Add handlers and set the default message parser
self.change_parser()
self.handlers = {}
self._send_lock = threading.Lock()
if ssl is None and self.port == 6697:
self.ssl = True
# Start the connection
if auto_connect:
self.connect()
# Debug print()
def debug(self, *args, **kwargs):
if self.debug_file:
print(*args, file=self.debug_file, **kwargs)
if hasattr(self.debug_file, 'flush'):
self.debug_file.flush()
# Send raw messages
def quote(self, *msg, force=None, tags=None):
if not tags and msg and isinstance(msg[0], dict):
tags = msg[0]
msg = msg[1:]
if not self.connected and not force:
self.debug('>Q>', *msg)
if not self.sendq:
self.sendq = []
if tags:
msg = (tags,) + msg
self.sendq.append(msg)
return
if (not isinstance(tags, dict)
or ('message-tags' not in self.active_caps and
'draft/message-tags-0.2' not in self.active_caps)):
tags = None
self.debug('>3> ' + repr(tags) if tags else '>>>', *msg)
msg = (' '.join(msg).encode('utf-8').replace(b'\r', b' ')
.replace(b'\n', b' '))
if len(msg) + 2 > self.msglen:
msg = msg[:self.msglen - 2]
if msg[-1] >= 0x80:
msg = msg.decode('utf-8', 'ignore').encode('utf-8')
if tags:
msg = _dict_to_tags(tags) + msg
# Non-blocking sockets can't use sendall() reliably
msg += b'\r\n'
self._send_lock.acquire()
sent_bytes = 0
try: # Apparently try/finally is faster than "with".
while True:
try:
# Attempt to send to the socket
sent_bytes += self.sock.send(msg[sent_bytes:])
except ssl.SSLWantReadError:
# Wait for the socket to become ready again
readable, _, _ = select.select(
(self.sock,), (), (self.sock,),
self.ping_timeout or self.ping_interval
)
continue
except (BlockingIOError, ssl.SSLWantWriteError):
pass
else:
# Break if enough data has been written
if sent_bytes >= len(msg):
break
# Otherwise wait for the socket to become writable
select.select((), (self.sock,), (self.sock,),
self.ping_timeout or self.ping_interval)
except socket.timeout:
# Abort the connection if there was a timeout because the data may
# have been partially written
try:
self.sock.close()
except OSError:
pass
if force:
raise
except (AttributeError, BrokenPipeError):
if force:
raise
finally:
self._send_lock.release()
def send(self, *msg, force=None, tags=None):
if len(msg) > 1:
self.quote(*tuple(map(_prune_arg, msg[:-1])) + (':' + msg[-1],),
force=force, tags=tags)
else:
self.quote(*msg, force=force, tags=tags)
# User-friendly msg, notice, and CTCP functions.
def msg(self, target, *msg, tags=None):
self.quote('PRIVMSG', target, ':' + ' '.join(msg), tags=tags)
def notice(self, target, *msg, tags=None):
self.quote('NOTICE', target, ':' + ' '.join(msg), tags=tags)
def ctcp(self, target, *msg, reply=False, tags=None):
m = (self.notice if reply else self.msg)
return m(target, '\x01{}\x01'.format(' '.join(msg)), tags=tags)
def me(self, target, *msg, tags=None):
return self.ctcp(target, 'ACTION', *msg, tags=tags)
# Allow per-connection handlers
def Handler(self, *events, ircv3=False, colon=True):
return _add_handler(self.handlers, events, ircv3, False, colon)
def CmdHandler(self, *events, ircv3=False, colon=True):
return _add_handler(self.handlers, events, ircv3, True, colon)
# The connect function
def connect(self):
self._send_lock.acquire()
try:
if self.connected is not None:
self.debug('Already connected!')
return
self.connected = False
finally:
self._send_lock.release()
self.debug('Connecting to', self.ip, 'port', self.port)
self.sock = socket.create_connection(
(self.ip, self.port),
timeout=self.ping_timeout or self.ping_interval,
)
if self.ssl:
self.debug('SSL handshake')
ctx = ssl.create_default_context(cafile=get_ca_certs())
if self.verify_ssl:
assert ctx.check_hostname
else:
warnings.warn('Disabling verify_ssl is usually a bad idea.')
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self.sock = ctx.wrap_socket(self.sock, server_hostname=self.ip)
# Begin IRCv3 CAP negotiation.
self._current_nick = self._desired_nick
self._unhandled_caps = None
self.quote('CAP LS 302', force=True)
self.quote('USER', self.ident, '0', '*', ':' + self.realname,
force=True)
self.quote('NICK', self._desired_nick, force=True)
atexit.register(self.disconnect)
self.debug('Starting main loop...')
self._sasl = self._pinged = self._keepnick_active = False
self._start_main_loop()
def _start_main_loop(self):
# Start the thread before updating _main_thread so that
# wait_until_disconnected() works correctly.
thread = threading.Thread(target=self._main)
thread.start()
self._main_thread = thread
# Disconnect from IRC.
def disconnect(self, msg=None, *, auto_reconnect=False):
self.persist = auto_reconnect and self.persist
self.connected = None
self.active_caps.clear()
atexit.unregister(self.disconnect)
self._current_nick = self._desired_nick
self._unhandled_caps = None
try:
self.quote('QUIT :' + str(msg or self.quit_message), force=True)
self.sock.shutdown(socket.SHUT_RDWR)
except:
pass
try:
self.sock.close()
except:
pass
# Finish capability negotiation
def finish_negotiation(self, cap):
self.debug('Capability', cap, 'handled.')
if self._unhandled_caps:
cap = cap.lower()
if cap in self._unhandled_caps:
del self._unhandled_caps[cap]
if len(self._unhandled_caps) < 1:
self._unhandled_caps = None
if not self.connected:
self.quote('CAP END', force=True)
# Change the message parser
def change_parser(self, parser=ircv3_message_parser):
self._parse = parser
# Start a handler function
def _start_handler(self, handlers, command, hostmask, tags, args):
r = False
for handler in handlers:
r = True
params = [self, hostmask, list(args)]
if not hasattr(handler, 'miniirc_colon') and args and \
args[-1].startswith(':'):
params[2][-1] = args[-1][1:]
if hasattr(handler, 'miniirc_ircv3'):
params.insert(2, dict(tags))
if hasattr(handler, 'miniirc_cmd_arg'):
params.insert(1, command)
if self._executor is None:
threading.Thread(target=handler, args=params).start()
else:
self._executor.submit(handler, *params)
return r
# Launch handlers
def _handle(self, cmd, hostmask, tags, args):
r = False
cmd = str(cmd).upper()
hostmask = tuple(hostmask)
for handlers in (_global_handlers, self.handlers):
if cmd in handlers:
r = self._start_handler(handlers[cmd], cmd, hostmask, tags,
args)
if None in handlers:
self._start_handler(handlers[None], cmd, hostmask, tags, args)
return r
# Launch IRCv3 handlers
def _handle_cap(self, cap):
cap = cap.lower()
self.active_caps.add(cap)
if self._unhandled_caps and cap in self._unhandled_caps:
handled = self._handle(
'IRCv3 ' + cap, ('CAP', 'CAP', 'CAP'), {},
self._unhandled_caps[cap]
)
if not handled:
self.finish_negotiation(cap)
# The main loop
def _main(self):
# Make the socket non-blocking.
self.sock.setblocking(False)
self.debug('Main loop running!')
buffer = b''
while True:
try:
assert len(buffer) < 65535, 'Very long line detected!'
try:
# Acquire the send lock when receiving data because I don't
# think you're supposed to call SSL functions from multiple
# threads at once
self._send_lock.acquire()
try:
raw = self.sock.recv(8192).replace(b'\r', b'\n')
finally:
self._send_lock.release()
if not raw:
raise ConnectionAbortedError
buffer += raw
except (BlockingIOError, ssl.SSLWantReadError):
# Wait for the socket to become ready again
readable, _, _ = select.select(
(self.sock,), (), (self.sock,),
# self.ping_interval should be used when
# self.ping_timeout is None
(self._pinged and self.ping_timeout or
self.ping_interval)
)
# Handle ping timeouts
if not readable:
if self._pinged:
raise TimeoutError
self._pinged = True
self.quote('PING', ':miniirc-ping', force=True)
except ssl.SSLWantWriteError:
select.select((), (self.sock,), (self.sock,),
self.ping_timeout or self.ping_interval)
# Attempt to change nicknames every 30 seconds
if (self._keepnick_active and
time.monotonic() > self._last_keepnick_attempt + 30):
self.send('NICK', self._desired_nick, force=True)
self._last_keepnick_attempt = time.monotonic()
except OSError as e:
self.debug('Lost connection!', repr(e))
self.disconnect(auto_reconnect=True)
while self.persist:
time.sleep(5)
self.debug('Reconnecting...')
try:
self.connect()
except OSError:
self.debug('Failed to reconnect!')
self.connected = None
else:
return
return
raw = buffer.split(b'\n')
buffer = raw.pop()
for line in raw:
line = line.decode('utf-8', 'replace')
if line:
self.debug('<<<', line)
try:
result = self._parse(line)
except Exception:
result = None
if isinstance(result, tuple) and len(result) == 4:
self._handle(*result)
else:
self.debug('Ignored message:', line)
del raw
def wait_until_disconnected(self, *, _timeout=None):
# The main thread may be replaced on reconnects
while self._main_thread and self._main_thread.is_alive():
self._main_thread.join(_timeout)
def main(self):
warnings.warn('The miniirc.IRC.main() function is deprecated and '
'should not be used.', DeprecationWarning, 2)
# The main thread may be started after the if check and before
# _start_main_loop. This function is deprecated so fixing this probably
# isn't worthwhile.
if not self._main_thread or not self._main_thread.is_alive():
self._start_main_loop()
# Handle some IRC messages by default.
@Handler('001')
def _handler(irc, hostmask, args):
irc.connected = True
irc.isupport.clear()
irc._unhandled_caps = None
irc.debug('Connected!')
# Update the current nickname and activate keepnick if required
irc._last_keepnick_attempt = time.monotonic()
irc._keepnick_active = args[0] != irc._desired_nick
irc._current_nick = args[0]
# Apply connection modes
if irc.connect_modes:
irc.quote('MODE', irc.current_nick, irc.connect_modes)
# Log into NickServ if required
if not irc._sasl and irc.ns_identity:
irc.debug('Logging in (no SASL, aww)...')
irc.msg('NickServ', 'identify ' + irc.ns_identity)
# Join channels
if irc.channels:
irc.debug('*** Joining channels...', irc.channels)
irc.quote('JOIN', ','.join(irc.channels))
# Send any queued messages
with irc._send_lock:
sendq, irc.sendq = irc.sendq, None
if sendq:
for i in sendq:
irc.quote(*i)
@Handler('PING', colon=True)
def _handler(irc, hostmask, args):
irc.quote('PONG', *args, force=True)
@Handler('PONG', colon=False)
def _handler(irc, hostmask, args):
if args and args[-1] == 'miniirc-ping' and irc.ping_interval:
irc._pinged = False
@Handler('432', '433')
def _handler(irc, hostmask, args):
if not irc.connected:
try:
return int(irc._current_nick[0])
except ValueError:
pass
if len(irc._current_nick) >= irc.isupport.get('NICKLEN', 20):
return
irc.debug('WARNING: The requested nickname', repr(irc._current_nick),
'is invalid. Trying again with',
repr(irc._current_nick + '_') + '...')
irc._current_nick += '_'
irc.quote('NICK', irc.current_nick, force=True)
@Handler('NICK', colon=False)
def _handler(irc, hostmask, args):
if hostmask[0].lower() == irc._current_nick.lower():
irc._current_nick = args[-1]
# Deactivate keepnick if the client has the right nickname
if irc._current_nick.lower() == irc._desired_nick.lower():
irc._keepnick_active = False
@Handler('PRIVMSG', colon=False)
def _handler(irc, hostmask, args):
if not version:
return
if args[-1].startswith('\x01VERSION') and args[-1].endswith('\x01'):
irc.ctcp(hostmask[0], 'VERSION', version, reply=True)
# Handle IRCv3 capabilities
@Handler('CAP', colon=False)
def _handler(irc, hostmask, args):
if len(args) < 3:
return
cmd = args[1].upper()
if cmd in ('LS', 'NEW'):
caps = args[-1].split(' ')
req = set()
if not irc._unhandled_caps:
irc._unhandled_caps = {}
for raw in caps:
raw = raw.split('=', 1)
cap = raw[0].lower()
if cap in irc.ircv3_caps:
irc._unhandled_caps[cap] = raw
if cap == 'sts':
irc._handle_cap(cap)
else:
req.add(cap)
if irc.connected is None:
return
elif req:
irc.quote('CAP REQ', ':' + ' '.join(req), force=True)
elif cmd == 'LS' and not irc._unhandled_caps and args[2] != '*':
irc._unhandled_caps = None
irc.quote('CAP END', force=True)
elif cmd == 'ACK':
caps = args[-1].split(' ')
for cap in caps:
irc._handle_cap(cap)
elif cmd == 'NAK':
irc._unhandled_caps = None
irc.quote('CAP END', force=True)
elif cmd == 'DEL':
for cap in args[-1].split(' '):
cap = cap.lower()
if cap in irc.active_caps:
irc.active_caps.remove(cap)
# SASL
@Handler('IRCv3 SASL')
def _handler(irc, hostmask, args):
if irc.ns_identity and (len(args) < 2 or 'PLAIN' in
args[-1].upper().split(',')):
irc.quote('AUTHENTICATE PLAIN', force=True)
else:
irc.quote('AUTHENTICATE *', force=True)
irc.finish_negotiation('sasl')
@Handler('AUTHENTICATE', colon=False)
def _handler(irc, hostmask, args):
if args and args[0] == '+':
from base64 import b64encode
irc._sasl = True
pw = irc.ns_identity.split(' ', 1)
pw = '{0}\x00{0}\x00{1}'.format(*pw).encode('utf-8')
irc.quote('AUTHENTICATE', b64encode(pw).decode('utf-8'), force=True)
@Handler('904', '905')
def _handler(irc, hostmask, args):
if irc._sasl:
irc._sasl = False
irc.quote('AUTHENTICATE *', force=True)
@Handler('902', '903', '904', '905')
def _handler(irc, hostmask, args):
irc.finish_negotiation('sasl')
# STS
@Handler('IRCv3 STS')
def _handler(irc, hostmask, args):
if not irc.ssl and len(args) == 2:
try:
port = int(_tags_to_dict(args[1], ',')['port'])
except (IndexError, ValueError):
return
# Stop irc.wait_until_disconnected() from returning early
irc._main_thread = threading.current_thread()
persist = irc.persist
irc.disconnect()
irc.debug('STS detected, enabling TLS/SSL and changing the port to ',
port)
irc.port = port
irc.ssl = True
time.sleep(1)
irc.connect()
irc.persist = persist
else:
irc.finish_negotiation('sts')
# Maximum line length
@Handler('IRCv3 oragono.io/maxline-2')
def _handler(irc, hostmask, args):
try:
irc.msglen = max(int(args[-1]), 512)
except ValueError:
pass
irc.finish_negotiation(args[0])
# Handle ISUPPORT messages
@Handler('005')
def _handler(irc, hostmask, args):
del args[0], args[-1]
isupport = _tags_to_dict(args, None)
# Try and auto-detect integers
remove = set()
for key in isupport:
try:
isupport[key] = int(isupport[key])
# Disable keepnick if the nickname is too long
if key == 'NICKLEN' and len(irc._desired_nick) > isupport[key]:
irc._keepnick_active = False
except ValueError:
if key.endswith('LEN'):
remove.add(key)
for key in remove:
del isupport[key]
irc.isupport.update(isupport)
# Attempt to get the desired nickname if the user that currently has it quits
@Handler('QUIT', 'NICK')
def _handler(irc, hostmask, args):
if (irc.connected and irc._keepnick_active and
hostmask[0].lower() == irc._desired_nick.lower()):
irc.send('NICK', irc._desired_nick, force=True)
irc._last_keepnick_attempt = time.monotonic()
# Stop trying to get the desired nickname if it's invalid or if nick changes
# aren't permitted
@Handler('432', '435', '447')
def _handler(irc, hostmask, args):
irc._keepnick_active = False
_colon_warning = True
del _handler