desophos/idleon-saver

View on GitHub
idleon_saver/gui/main.py

Summary

Maintainability
A
0 mins
Test Coverage
import asyncio
import json
import logging
import os
import sys
import threading
from enum import Enum
from pathlib import Path
from zipfile import ZipFile

from idleon_saver.utility import BUGREPORT_LINK, logs_dir, user_dir

# Set log config before all kivy imports
# to ensure that *all* kivy logging obeys our settings.
# (Prevents kivy.config from creating extra log files.)
os.environ["KCFG_KIVY_LOG_DIR"] = str(logs_dir())
os.environ["KCFG_KIVY_LOG_NAME"] = "log_%y-%m-%d_%_.txt"
os.environ["KCFG_KIVY_LOG_LEVEL"] = "debug"
os.environ["KCFG_KIVY_LOG_MAXFILES"] = "10"

from kivy.config import Config

# We need to change kivy config before other kivy imports.
Config.set("graphics", "width", 820)
Config.set("graphics", "minimum_width", 700)
Config.set("graphics", "height", 300)
Config.set("graphics", "minimum_height", 300)
Config.set("input", "mouse", "mouse,disable_multitouch")

from kivy.app import App
from kivy.clock import Clock
from kivy.logger import Logger
from kivy.properties import ListProperty, ObjectProperty, OptionProperty, StringProperty
from kivy.resources import resource_add_path
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.popup import Popup
from kivy.uix.screenmanager import Screen, ScreenManager

# Make other modules use Kivy's logger.
logging.Logger.manager.root = Logger

from idleon_saver.scripts import inject
from idleon_saver.scripts.export import FirebaseExporter
from idleon_saver.utility import Formats, friendly_name


class ButtonBox(BoxLayout):
    pass


class ExportButtonBox(ButtonBox):
    button = ObjectProperty(None)
    label = ObjectProperty(None)
    fmt = OptionProperty(Formats.IC, options=list(Formats))
    try_export = ObjectProperty(None)

    def on_kv_post(self, _):
        self.label.text = f"to {friendly_name(self.fmt.value)}"
        self.button.bind(on_release=self.btn_released)

    def btn_released(self, instance):
        if self.try_export(self.fmt):
            instance.text = "Exported"


class VBox(BoxLayout):
    pass


class ErrorDialog(VBox):
    text = StringProperty("")
    done = ObjectProperty(None)

    @staticmethod
    def open_logs():
        with ZipFile(user_dir() / "logs.zip", "w") as zf:
            for f in user_dir().rglob("*.*"):
                # Include all data files
                if f.suffix != ".zip":
                    zf.write(f, f.name)
        # skipcq: BAN-B606
        os.startfile(user_dir(), "explore")

    @staticmethod
    def open_github():
        # skipcq: BAN-B606
        os.startfile(BUGREPORT_LINK)


class FileChooserDialog(VBox):
    filters = ListProperty([])
    done = ObjectProperty(None)
    cancel = ObjectProperty(None)


class MyScreen(Screen):
    def dismiss_popup(self):
        try:
            self._popup.dismiss()
        except AttributeError as e:
            Logger.exception("Popup dismissed before being created", exc_info=e)

    def popup_error(self, text):
        content = ErrorDialog(text=text, done=self.dismiss_popup)
        # skipcq: PYL-W0201
        self._popup = Popup(title="Error :(", content=content, size_hint=(0.95, 0.95))
        self._popup.open()


class StartScreen(MyScreen):
    pass


class EndScreen(MyScreen):
    export = ObjectProperty(None)

    def try_export(self, fmt: Formats):
        try:
            self.export(fmt)
        # skipcq: PYL-W0703
        except Exception as e:
            Logger.exception(e)
            self.popup_error(
                text=(
                    "Oops! Something went wrong.\n\n"
                    "If you keep getting this error, "
                    "please report it on GitHub with "
                    "your logs.zip attached."
                )
            )
            return False
        else:
            return True


Blockers = Enum("Blockers", "PATH ACTION")


class PathScreen(MyScreen):
    back = ObjectProperty(None)
    next = ObjectProperty(None)
    action = ObjectProperty(None)
    caption = StringProperty("")
    default_path = StringProperty("")
    path_input = ObjectProperty(None)
    path_filters = ListProperty([])
    error = ObjectProperty(None)
    instructions = StringProperty("")
    progress = ObjectProperty(None)

    blockers = dict.fromkeys(list(Blockers), False)
    action_done = threading.Event()

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.on_path_text(self.default_path)
        # Recheck path every second in case of filesystem changes.
        # Hopefully not a performance issue.
        Clock.schedule_interval(lambda dt: self.on_path_text(), 1)
        self.next.bind(
            on_release=lambda _: asyncio.get_running_loop().create_task(
                self.start_action()
            )
        )

    def block_next(self, which: Blockers, val: bool):
        """Disable "Next" button if blocked for any reason.

        Enable when all blocks are cleared.
        True means "blocked" to match Button.disabled.
        """
        self.blockers[which] = val
        self.next.disabled = any(self.blockers.values())

    def show_filebrowser(self):
        content = FileChooserDialog(
            done=self.set_path, cancel=self.dismiss_popup, filters=self.path_filters
        )
        # skipcq: PYL-W0201
        self._popup = Popup(title="Find file", content=content, size_hint=(1, 1))
        self._popup.open()

    def set_path(self, directory, filename):
        try:
            self.path_input.text = str(Path(directory, filename[0]))
        except IndexError:
            pass  # no file selected, so just treat it like canceling
        self.dismiss_popup()

    def on_path_text(self, text=None):
        if text is None:
            text = self.path_input.text

        if text and Path(text).exists() and Path(text).name == "LegendsOfIdleon.exe":
            # Valid path -> allow next
            self.error.opacity = 0.0
            self.block_next(Blockers.PATH, False)
        else:
            # Invalid path -> block next
            self.error.opacity = 1.0
            self.block_next(Blockers.PATH, True)

    def increment_progress(self, amt: int):
        """Increments progress until self.action_done is set.

        Args:
            amt: Amount to increment progress bar by.
            slp: Seconds to sleep in between increments.

        Time to fill progress bar = 100 * slp / amt.
        """
        if self.action_done.is_set():
            # Reset progress bar.
            self.progress.opacity = 0.0
            self.progress.value = 0
            return False  # Stop scheduling this callback.
        else:
            self.progress.opacity = 1.0
            self.progress.value += amt

    async def start_action(self):
        # Disable "Next" button until action is completed.
        self.next.text = "Loading..."
        self.block_next(Blockers.ACTION, True)

        # Increment progress bar while action is underway.
        self.action_done.clear()
        Clock.schedule_interval(lambda dt: self.increment_progress(1), 0.3)

        try:
            # Use threading to avoid freezing the UI.
            await asyncio.to_thread(self.action, self.path_input.text)
        # skipcq: PYL-W0703
        except Exception as e:
            Logger.exception(e)
            self.popup_error(
                text=(
                    "Oops! Something went wrong. "
                    "Make sure Steam is running and Legends of Idleon is closed, "
                    "then check the path and try again. "
                    "If it still doesn't work, "
                    "make sure you've played a character recently.\n\n"
                    "If you keep getting this error, "
                    "please report it on GitHub with your logs.zip attached."
                )
            )
        else:
            self.manager.transition.direction = "left"
            self.manager.current = self.manager.next()
        finally:
            self.action_done.set()
            # Only re-enable "Next" button once action is completed.
            self.next.text = "Next"
            self.block_next(Blockers.ACTION, False)


class MainWindow(ScreenManager):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.savedata = None
        self.userdir = user_dir()

        screens = [
            StartScreen(name="start"),
            PathScreen(
                caption="Path to LegendsOfIdleon.exe:",
                default_path="C:/Program Files (x86)/Steam/steamapps/common/Legends of Idleon/LegendsOfIdleon.exe",
                path_filters=["*.exe"],
                instructions="Make sure Steam is running and Legends of Idleon is closed, then click Next.\nLegends of Idleon will open briefly to retrieve your save data.",
                name="find_exe",
                action=self.get_json,
            ),
            EndScreen(name="end", export=self.export),
        ]

        for screen in screens:
            self.add_widget(screen)

    @staticmethod
    def download_savedata(path: str) -> dict:
        return inject.main(Path(path))

    def get_json(self, path: str):
        self.savedata = self.download_savedata(path)
        with open(self.userdir / "idleon_save.json", "w", encoding="utf-8") as file:
            json.dump(self.savedata, file)

    def export(self, fmt: Formats):
        FirebaseExporter(self.savedata).export(fmt, self.userdir)

    def next(self):
        # don't wrap from last to first screen
        if self.current == self.screen_names[-1]:
            return self.current
        else:
            return super().next()

    def previous(self):
        # don't wrap from first to last screen
        if self.current == self.screen_names[0]:
            return self.current
        else:
            return super().previous()


class IdleonSaver(App):
    def build(self):
        # skipcq: PYL-W0201
        self.title = "Idleon Saver"
        return MainWindow()


if __name__ == "__main__":
    import importlib.metadata

    # Add data dir to Kivy path in frozen bundle.
    if hasattr(sys, "_MEIPASS"):
        # skipcq: PYL-W0212
        resource_add_path(os.path.join(sys._MEIPASS))  # type: ignore[attr-defined]

    Logger.info(f"Idleon Saver: version {importlib.metadata.version('idleon_saver')}")

    asyncio.run(IdleonSaver(kv_file="main.kv").async_run())