itsVale/Vale.py

View on GitHub
cogs/fun/currency.py

Summary

Maintainability
A
3 hrs
Test Coverage
import collections
import enum
import io
import math
import random

import discord
from discord.ext import commands
from PIL import Image

from utils import db
from utils.colors import random_color
from utils.examples import get_example, wrap_example
from utils.formats import pluralize
from utils.misc import run_in_executor
from utils.time import duration_units


class Money(db.Table, table_name='currency'):
    user_id = db.Column(db.BigInt, primary_key=True)
    amount = db.Column(db.Integer)


class Givelog(db.Table):
    id = db.Column(db.Serial, primary_key=True)
    giver = db.Column(db.BigInt)
    recipient = db.Column(db.BigInt)
    amount = db.Column(db.Integer)
    time = db.Column(db.Timestamp, default="now() at time zone 'utc'")


class DailyCashCooldowns(db.Table, table_name='daily_cash_cooldowns'):
    user_id = db.Column(db.BigInt, primary_key=True)
    latest_time = db.Column(db.Timestamp)


class DailyLog(db.Table):
    id = db.Column(db.Serial, primary_key=True)
    user_id = db.Column(db.BigInt)
    time = db.Column(db.Timestamp)
    amount = db.Column(db.Integer)


# Cooldown for `daily`
DAILY_CASH_COOLDOWN_TIME = 60 * 60 * 24
# minimum account age in days before one can use `daily` or `give`
MINIMUM_ACCOUNT_AGE = 7
MINIMUM_ACCOUNT_AGE_IN_SECONDS = MINIMUM_ACCOUNT_AGE * 24 * 60 * 60


class AccountTooYoung(commands.CheckFailure):
    """Will be raised when an account is less than 7 days old."""


def maybe_not_alt():
    def predicate(ctx):
        delta = ctx.message.created_at - ctx.author.created_at
        if delta.days > MINIMUM_ACCOUNT_AGE:
            return True

        retry_after = duration_units(MINIMUM_ACCOUNT_AGE_IN_SECONDS - delta.total_seconds())
        raise AccountTooYoung(
            f'Sorry, but your account is too young. Please wait for {retry_after} before you can use '
            f'`{ctx.clean_prefix}{ctx.command}`.'
        )

    return commands.check(predicate)


class Side(enum.Enum):
    heads = h = 'heads'
    tails = t = 'tails'

    # Probably implement these one day too?
    # edge = e = 'edge'
    # none = n = 'none'

    def __str__(self):
        return self.value

    @classmethod
    async def convert(cls, _, argument):
        try:
            return cls[argument.lower()]
        except KeyError:
            raise commands.BadArgument(f'{argument} is not a valid side.')

    @classmethod
    def random_example(cls, _):
        return random.choice(list(cls._member_map_))


SIDES = list(Side)[:2]
WEIGHTS = [0.4999, 0.4999, 0.0002][:2]


class _DummyUser(collections.namedtuple('_DummyUser', 'id')):
    @property
    def mention(self):
        return f'<Unknown User | ID: {self.id}>'


class NotNegative(commands.BadArgument):
    pass


class SideOrAmount(commands.Converter):
    __types = (Side, int)

    async def convert(self, ctx, argument):
        try:
            return await Side.convert(ctx, argument)
        except commands.BadArgument:
            pass

        try:
            return int(argument)
        except ValueError:
            raise commands.BadArgument(f'`{argument}` is not an amount or side.')

        return await self.__converter.convert(ctx, argument)

    @classmethod
    def random_example(cls, ctx):
        ctx.__sideoramount_flag__ = type_index = not getattr(ctx, '__sideoramount_flag__', False)
        return get_example(cls.__types[type_index], ctx)


def positive_int(argument):
    value = int(argument)
    if value > 0:
        return value

    raise NotNegative('Expected a positive value.')


class PositiveIntOnlyOnSide(commands.Converter):
    async def convert(self, ctx, argument):
        last_arg = ctx.args[-1]
        if isinstance(last_arg, int):
            raise commands.BadArgument(f'Hm, I thought you wanted to flip it {last_arg} times?')

        return positive_int(argument)


@wrap_example(positive_int)
@wrap_example(PositiveIntOnlyOnSide)
def _positive_int_example(ctx):
    if random.random() > 0.5:
        return random.choice([1, 5, 10])

    num = random.randint(2, 5)
    return random.choice(['9' * num, '1' + '0' * num])


class NonBlacklistedMember(commands.MemberConverter):
    async def convert(self, ctx, argument):
        member = await super().convert(ctx, argument)
        blacklist = ctx.bot.get_cog('Blacklists')

        if blacklist:
            if await blacklist.get_blacklist(member, connection=ctx.db):
                raise commands.BadArgument('This user is blacklisted.')

        return member

    @staticmethod
    def random_example(ctx):
        return get_example(discord.Member, ctx)


class Currency(commands.Cog):
    def __init__(self, bot):
        self.bot = bot

        with open('data/images/coins/heads.png', 'rb') as heads, \
                open('data/images/coins/tails.png', 'rb') as tails:

            self._heads_image = Image.open(heads).convert('RGBA')
            self._tails_image = Image.open(tails).convert('RGBA')

        if len({*self._heads_image.size, *self._tails_image.size}) != 1:
            raise RuntimeError('Images must be the same size.')

    def cog_unload(self):
        self._heads_image.close()
        self._tails_image.close()

    async def cog_command_error(self, ctx, error):
        if isinstance(error, NotNegative):
            await ctx.send('Fuck off. You\'re not going to mess up my economy!')
        elif isinstance(error, AccountTooYoung):
            await ctx.send(error)

    @property
    def image_size(self):
        return self._heads_image.size[0]

    async def get_money(self, user_id, *, connection=None):
        connection = connection or self.bot.pool

        query = 'SELECT amount FROM currency WHERE user_id = $1;'
        row = await connection.fetchrow(query, user_id)
        return row['amount'] if row else 0

    async def add_money(self, user_id, amount, *, connection=None):
        connection = connection or self.bot.pool

        query = """
            INSERT INTO   currency
            VALUES        ($1, $2)
            ON CONFLICT   (user_id)
            DO UPDATE SET amount = currency.amount + $2;
        """
        await connection.execute(query, user_id, amount)

    @commands.command(name='cash', aliases=['money', 'coins'])
    async def _cash(self, ctx, user: discord.Member = None):
        """Shows how much money you have."""

        user = user or ctx.author
        amount = await self.get_money(user.id, connection=ctx.db)
        if not amount:
            return await ctx.send(f'{user} has nothing.')

        await ctx.send(f'{user} has **{amount}** \N{MONEY WITH WINGS}.')

    @commands.command(name='leaderboard')
    async def _leaderboard(self, ctx):
        """Shows the 10 richest people."""

        query = """
            SELECT     user_id, amount FROM currency
            WHERE      amount > 0
            ORDER BY   amount
            DESC LIMIT 10;
        """

        get_user = self.bot.get_user
        fields = (
            f'{(get_user(user_id) or _DummyUser(user_id)).mention} with **{amount}**'
            for user_id, amount in await ctx.db.fetch(query)
        )

        # Probably paginate this?
        embed = discord.Embed(title='Top 10 richest people', description='\n'.join(fields), color=random_color())
        await ctx.send(embed=embed)

    @commands.command(name='give')
    @maybe_not_alt()
    async def _give(self, ctx, amount: positive_int, user: NonBlacklistedMember):
        """Gives some of your money to another user.

        You must have at least the amount you are trying to give.
        """

        if ctx.author == user:
            return await ctx.send('Wait...Did you really try to give money to yourself?!')

        money = await self.get_money(ctx.author.id, connection=ctx.db)
        if money < amount:
            return await ctx.send('You don\'t have enough to give it away.')

        query = 'UPDATE currency SET amount = amount - $2 WHERE user_id = $1;'
        await ctx.db.execute(query, ctx.author.id, amount)

        await self.add_money(user.id, amount, connection=ctx.db)

        query = 'INSERT INTO givelog (giver, recipient, amount) VALUES ($1, $2, $3);'
        await ctx.db.execute(query, ctx.author.id, user.id, amount)
        await ctx.message.add_reaction('\N{OK HAND SIGN}')

    @commands.command(name='award')
    @commands.is_owner()
    async def _award(self, ctx, amount: int, *, user: discord.User):
        """Awards some money to a user."""

        await self.add_money(user.id, amount, connection=ctx.db)
        await ctx.message.add_reaction('\N{OK HAND SIGN}')

    @commands.command(name='take')
    @commands.is_owner()
    async def _take(self, ctx, amount: int, *, user: discord.User):
        """Takes some money away from an user."""

        money = await self.get_money(user.id, connection=ctx.db)
        if not money:
            return await ctx.send(f'{user.mention} has no money left...yeah..')

        amount = min(money, amount)
        await self.add_money(user.id, -amount, connection=ctx.db)
        await ctx.message.add_reaction('\N{OK HAND SIGN}')

    @staticmethod
    async def _default_flip(ctx):
        """Flip called with no arguments."""

        side = random.choices(SIDES, WEIGHTS)[0]
        file = discord.File(f'data/images/coins/{side}.png', 'coin.png')

        embed = (discord.Embed(title=f'...Flipped {side}!', color=random_color())
                 .set_author(name=ctx.author.display_name, icon_url=ctx.author.avatar_url)
                 .set_image(url='attachment://coin.png'))

        await ctx.send(file=file, embed=embed)

    @run_in_executor
    def _flip_image(self, num_sides):
        images = {
            Side.heads: self._heads_image,
            Side.tails: self._tails_image,
        }
        stats = collections.Counter()

        root = num_sides ** 0.5
        height, width = round(root), int(math.ceil(root))

        sides = (random.choices(SIDES, WEIGHTS)[0] for _ in range(num_sides))

        size = self.image_size
        image = Image.new('RGBA', (width * size, height * size))

        for index, side in enumerate(sides):
            y, x = divmod(index, width)
            image.paste(images[side], (x * size, y * size))
            stats[side] += 1

        message = ' and '.join(pluralize(**{str(side)[:-1]: n}) for side, n in stats.items())

        f = io.BytesIO()
        image.save(f, 'png')
        f.seek(0)

        return message, discord.File(f, 'flipcoins.png')

    async def _numbered_flip(self, ctx, number):
        if number == 1:
            await self._default_flip(ctx)
        elif number > 100:
            await ctx.send('I am not going to flip that many coins for you.')
        elif number <= 0:
            await ctx.send('Wtf, how is that supposed to work?')
        else:
            message, file = await self._flip_image(number)

            embed = (discord.Embed(title=f'...Flipped {message}', color=random_color())
                     .set_author(name=ctx.author.display_name, icon_url=ctx.author.avatar_url)
                     .set_image(url='attachment://flipcoins.png'))

            await ctx.send(file=file, embed=embed)

    @commands.command(name='flip')
    @commands.bot_has_permissions(embed_links=True, attach_files=True)
    async def _flip(self, ctx, side_or_number: SideOrAmount = None, amount: PositiveIntOnlyOnSide = None):
        """Flips a coin.

        The first argument can either be the side (heads or tails) or the number of coins you want to flip.
        If you don't type anything, it will flip one coin.

        If you specify a side for the first argument, you can
        also type the amount of money you wish to bet on for this flip.
        Getting it right gives you 2.0x the money you've bet.
        """

        if not side_or_number:
            return await self._default_flip(ctx)
        if isinstance(side_or_number, int):
            return await self._numbered_flip(ctx, side_or_number)

        side = side_or_number
        is_betting = amount is not None

        if is_betting:
            money = await self.get_money(ctx.author.id, connection=ctx.db)
            if money < amount:
                return await ctx.send('You don\'t have enough.')

            new_amount = -amount

        actual = random.choices(SIDES, WEIGHTS)[0]
        won = actual == side

        if won:
            message = 'You won!'
            color = 0x4CAF50
            if is_betting:
                new_amount += amount * 2
                message += f'\nYou won **{new_amount}** \N{MONEY WITH WINGS}.'

        else:
            message = 'You lost!'
            color = 0xF44336
            if is_betting:
                lost = '**everything**' if amount == money else f'**{amount}** \N{MONEY WITH WINGS}'
                message += f'\nYou lost {lost} \N{MONEY WITH WINGS}.'

        if is_betting:
            query = 'UPDATE currency SET amount = amount + $2 WHERE user_id = $1;'
            await ctx.db.execute(query, ctx.author.id, new_amount)

        file = discord.File(f'data/images/coins/{actual}.png', 'coin.png')

        embed = (discord.Embed(description=message, color=color)
                 .set_author(name=ctx.author.display_name, icon_url=ctx.author.avatar_url)
                 .set_image(url='attachment://coin.png'))

        await ctx.send(file=file, embed=embed)

    @commands.command(name='daily')
    @maybe_not_alt()
    async def _daily_cash(self, ctx):
        """Command to give you daily cash (between 10 and 200).

        As the name suggests, you can only use this command every 24 hours.
        """

        author_id = ctx.author.id
        now = ctx.message.created_at

        query = 'SELECT latest_time FROM daily_cash_cooldowns WHERE user_id = $1;'
        row = await ctx.db.fetchrow(query, author_id)

        if row:
            delta = (now - row['latest_time']).total_seconds()
            retry_after = DAILY_CASH_COOLDOWN_TIME - delta

            if retry_after > 0:
                return await ctx.send(f'Don\'t be greedy. Wait at least {duration_units(retry_after)} before using this command again.')

        query = """
            INSERT INTO   daily_cash_cooldowns
            VALUES        ($1, $2)
            ON CONFLICT   (user_id)
            DO UPDATE SET latest_time = $2;
        """
        await ctx.db.execute(query, author_id, now)

        amount = random.randint(10, 200)
        await self.add_money(author_id, amount, connection=ctx.db)

        query = 'INSERT INTO dailylog (user_id, time, amount) VALUES ($1, $2, $3);'
        await ctx.db.execute(query, author_id, now, amount)

        await ctx.send(f'{ctx.author.mention}, for your daily hope you will receive **{amount}** \N{MONEY WITH WINGS}.')


def setup(bot):
    bot.add_cog(Currency(bot))