meetbryce/open-source-slack-ai

View on GitHub
ossai/utils.py

Summary

Maintainability
B
4 hrs
Test Coverage
A
96%
import os
import re
import uuid
from time import mktime, gmtime, strptime
import calendar
from typing import Union

from datetime import date
from dotenv import load_dotenv
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from langchain.callbacks.tracers import LangChainTracer

load_dotenv(override=True)
_id_name_cache = {}


class CustomLangChainTracer(LangChainTracer):
    def __init__(self, is_private=False, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.is_private = is_private

    def handleText(self, text, runId):
        if not self.is_private:
            print("passing text")
            super().handleText(text, runId)
        else:
            print("passing no text")
            super().handleText("", runId)


async def get_bot_id(client) -> str:
    """
    Retrieves the bot ID using the provided Slack WebClient.

    Returns:
        str: The bot ID.
    """
    try:
        response = client.auth_test()
        return response["bot_id"]
    except SlackApiError as e:
        print(f"Error fetching bot ID: {e.response['error']}")
        return "None"


async def get_channel_history(
    client: WebClient,
    channel_id: str,
    since: date = None,
    include_threads: bool = False,
) -> list:
    # todo: if include_threads, recursively get messages from threads

    oldest_timestamp = mktime(since.timetuple()) if since else 0
    response = client.conversations_history(
        channel=channel_id, limit=1000, oldest=oldest_timestamp
    )  # 1000 is the max limit
    bot_id = await get_bot_id(client)
    # todo: (optional) excluding all other bots too
    # todo: (optional) exclude messages that start with `/` (i.e. slash commands)
    return [msg for msg in response["messages"] if msg.get("bot_id") != bot_id]


async def get_direct_message_channel_id(client: WebClient, user_id: str) -> str:
    """
    Get the direct message channel ID for the bot, so you can say() via direct message.
    :return str:
    """
    # todo: cache this sucker too
    try:
        response = client.conversations_open(users=user_id)
        return response["channel"]["id"]
    except SlackApiError as e:
        print(f"Error fetching bot DM channel ID: {e.response['error']}")
        raise e


def get_is_private_and_channel_name(
    client: WebClient, channel_id: str
) -> tuple[bool, str]:
    try:
        channel_info = client.conversations_info(channel=channel_id)
        channel_name = channel_info["channel"]["name"]
        is_private = channel_info["channel"]["is_private"]
    except Exception as e:
        print(f"Error getting channel info for is_private, defaulting to private: {e}")
        channel_name = "unknown"
        is_private = True
    return is_private, channel_name


def get_langsmith_config(feature_name: str, user: dict, channel: str, is_private=False):
    run_id = str(uuid.uuid4())
    tracer = CustomLangChainTracer(
        is_private=is_private
    )  # FIXME: this doesn't add privacy like it should

    return {
        "run_id": run_id,
        "metadata": {
            "is_private": is_private,
            **({"user_name": user.get("name")} if "name" in user else {}),
            **({"user_title": user.get("title")} if "title" in user else {}),
            "channel": channel,
        },
        "tags": [feature_name],
        "callbacks": [tracer],
    }


def get_llm_config():
    chat_model = os.getenv("CHAT_MODEL", "gpt-3.5-turbo").strip()
    temperature = float(os.getenv("TEMPERATURE", 0.2))
    openai_api_key = os.getenv("OPENAI_API_KEY", "").strip()
    debug = bool(os.environ.get("DEBUG", False))
    max_body_tokens = int(os.getenv("MAX_BODY_TOKENS", 1000))
    language = os.getenv("LANGUAGE", "english")

    if not openai_api_key:
        raise ValueError("OPENAI_API_KEY is not set in .env file")
    return {
        "chat_model": chat_model,
        "temperature": temperature,
        "OPENAI_API_KEY": openai_api_key,
        "debug": debug,
        "max_body_tokens": max_body_tokens,
        "language": language,
    }


def get_name_from_id(client: WebClient, user_or_bot_id: str, is_bot=False) -> str:
    """
    Retrieves the name associated with a user ID or bot ID.

    Args:
        client (WebClient): An instance of the Slack WebClient.
        user_or_bot_id (str): The user or bot ID.
        is_bot (bool): Whether the ID is a bot ID.

    Returns:
        str: The name associated with the ID.
    """
    if user_or_bot_id in _id_name_cache:
        return _id_name_cache[user_or_bot_id]

    try:
        user_response = client.users_info(user=user_or_bot_id)
        if user_response.get("ok"):
            name = user_response["user"].get(
                "real_name", user_response["user"]["profile"]["real_name"]
            )
            _id_name_cache[user_or_bot_id] = name
            return name
        else:
            print("user fetch failed")
            raise SlackApiError("user fetch failed", user_response)
    except SlackApiError as e:
        if e.response["error"] == "user_not_found":
            try:
                bot_response = client.bots_info(bot=user_or_bot_id)
                if bot_response.get("ok"):
                    _id_name_cache[user_or_bot_id] = bot_response["bot"]["name"]
                    return bot_response["bot"]["name"]
                else:
                    print("bot fetch failed")
                    raise SlackApiError("bot fetch failed", bot_response)
            except SlackApiError as e2:
                print(
                    f"Error fetching name for bot {user_or_bot_id=}: {e2.response['error']}"
                )
        print(f"Error fetching name for {user_or_bot_id=} {is_bot=} {e=}")

    return "Someone"


def get_parsed_messages(client, messages, with_names=True):
    def parse_message(msg):
        user_id = msg.get("user")
        if user_id is None:
            bot_id = msg.get("bot_id")
            name = get_name_from_id(client, bot_id, is_bot=True)
        else:
            name = get_name_from_id(client, user_id)

        # substitute @mentions with names
        parsed_message = re.sub(
            r"<@[UB]\w+>",
            lambda m: get_name_from_id(client, m.group(0)[2:-1]),
            msg["text"],
        )

        if not with_names:
            return re.sub(
                r"<@[UB]\w+>", lambda m: "", msg["text"]
            )  # remove @mentions + don't prepend author name

        return f"{name}: {parsed_message}"

    return [parse_message(message) for message in messages]


def get_text_and_blocks_for_say(
    title: str, run_id: Union[uuid.UUID, None], messages: list
) -> tuple[str, list]:
    CHAR_LIMIT = 3000
    text = "\n".join(messages)

    blocks = [
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": title,
            },
        },
    ]

    # Split text into multiple blocks if it exceeds 3000 characters
    remaining_text = text
    while len(remaining_text) > 0:
        chunk = remaining_text[:CHAR_LIMIT]
        blocks.append(
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": chunk,
                },
            }
        )
        remaining_text = remaining_text[CHAR_LIMIT:]

    if run_id is not None:
        blocks.append(
            {
                "type": "actions",
                "elements": [
                    {
                        "type": "button",
                        "text": {"type": "plain_text", "text": ":-1: Not Helpful"},
                        "action_id": "not_helpful_button",
                        "value": str(run_id),
                    },
                    {
                        "type": "button",
                        "text": {"type": "plain_text", "text": ":+1: Helpful"},
                        "action_id": "helpful_button",
                        "value": str(run_id),
                    },
                    {
                        "type": "button",
                        "text": {"type": "plain_text", "text": ":tada: Very Helpful"},
                        "action_id": "very_helpful_button",
                        "value": str(run_id),
                    },
                ],
            }
        )

    return text.split("\n")[0], blocks


async def get_user_context(client: WebClient, user_id: str) -> dict:
    """
    Get the username and title for the given user ID.
    """
    try:
        user_info = client.users_info(user=user_id)
        print(user_info)
        if user_info["ok"]:
            name = user_info["user"]["name"]
            title = user_info["user"]["profile"]["title"]
            return {"name": name, "title": title}
    except SlackApiError as e:
        print(f"Failed to fetch username: {e}")
        return {}


def get_workspace_name(client: WebClient):
    """
    Retrieve the workspace name using an instantiated Slack WebClient.

    Args:
    - client (WebClient): An instantiated Slack WebClient.

    Returns:
    - str: The workspace name if found, otherwise an empty string.
    """

    try:
        response = client.team_info()
        if response["ok"]:
            return response["team"]["name"]
        else:
            print(f"Error retrieving workspace name: {response['error']}")
            return os.getenv("WORKSPACE_NAME_FALLBACK", "")
    except SlackApiError as e:
        print(f"Error retrieving workspace name: {e.response['error']}")
        return os.getenv("WORKSPACE_NAME_FALLBACK", "")  # None


def get_since_timeframe_presets():
    DAY_OF_SECONDS = 86400
    now = gmtime()
    today = calendar.timegm(
        strptime(f"{now.tm_year}-{now.tm_mon}-{now.tm_mday}", "%Y-%m-%d")
    )
    options = [
        ("Last 7 days", str(today - 7 * DAY_OF_SECONDS)),
        ("Last 14 days", str(today - 14 * DAY_OF_SECONDS)),
        ("Last 30 days", str(today - 30 * DAY_OF_SECONDS)),
        (
            "This week",
            str(today - (now.tm_wday * DAY_OF_SECONDS)),
        ),  # Monday at 00:00:00
        (
            "Last week",
            str(today - (now.tm_wday * DAY_OF_SECONDS) - 7 * DAY_OF_SECONDS),
        ),  # From the start of last week
        (
            "This month",
            str(
                calendar.timegm(strptime(f"{now.tm_year}-{now.tm_mon}-01", "%Y-%m-%d"))
            ),
        ),  # From the start of this month
        (
            "Last month",
            str(
                calendar.timegm(
                    strptime(
                        f"{now.tm_year if now.tm_mon > 1 else now.tm_year - 1}-{now.tm_mon - 1 if now.tm_mon > 1 else 12}-01",
                        "%Y-%m-%d",
                    )
                )
            ),
        ),  # From the start of last month
    ]
    return {
        "type": "static_select",
        "placeholder": {"type": "plain_text", "text": "Select a preset", "emoji": True},
        "action_id": "summarize_since_preset",
        "options": [
            {
                "text": {"type": "plain_text", "text": text, "emoji": True},
                "value": value,
            }
            for (text, value) in options
        ],
    }


def main():
    print("DEBUGGING")


if __name__ == "__main__":
    main()