srevinsaju/guiscrcpy

View on GitHub
guiscrcpy/lib/mapper/mapper.py

Summary

Maintainability
A
1 hr
Test Coverage
"""
GUISCRCPY by srevinsaju
Get it on : https://github.com/srevinsaju/guiscrcpy
Licensed under GNU Public License

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""

import json
import os
import time
import uuid
import subprocess

from qtpy import QtWidgets
from qtpy.QtCore import QThread

from pynput import keyboard

from guiscrcpy.lib.bridge.exceptions import AdbRuntimeError
from guiscrcpy.lib.mapper.ux import MapperUI

fixed_pos = [0.0, 0.0]
final_pos = [0.0, 0.0]

json_file = "guiscrcpy.mapper.json"


def log(category, message):
    print("[{}]".format(str(category).upper()), message)


class Mapper:
    def __init__(self, device_id, adb, config_path=None):
        self.config = dict()
        self._device_id = device_id
        self.app = None
        self.adb = adb
        log("mapper", "Waiting for device...")
        self.adb.command(["wait-for-any-device"])
        log("mapper", "Device connection established...")
        self.window = None
        self.guiscrcpy_mapper_json = config_path
        self.dimensions = adb.get_dimensions(device_id)
        log("mapper", "Checking device orientation")
        if self.check_orientation() == 1:
            # reverse the detected dimensions.
            # possibly the device is landscape / not the default
            # orientation as detected by Android Window Manager
            self.dimensions = self.dimensions[::-1]

    def check_orientation(self):
        proc = self.adb.shell("dumpsys input")
        try:
            e_code = proc.wait(5)
        except subprocess.TimeoutExpired:
            e_code = 0
            log(
                "mapper",
                "Failed to detect orientation instantly. Expect"
                "invalid orientations.",
            )
        if e_code != 0:
            # process failed
            raise AdbRuntimeError(
                "adb failed with {ecode} when trying to "
                "execute command "
                "`adb shell dumpsys input`".format(ecode=e_code)
            )
        out, err = proc.communicate()
        out, err = out.decode(), err.decode()
        if "SurfaceOrientation" in out:
            # SurfaceOrientation gives the idea if the device is
            # landscape or portait. SurfaceOrientation: 1 mentions that
            # the mobile is oriented in the landscape orientation
            # SugrfaceOrientation: 0 indicates, default
            print("Detected SurfaceOrientation, processing...")
            if "SurfaceOrientation: 0" in out:
                print("Detected Portait orientation...")
                return 0
            elif "SurfaceOrientation: 1" in out:
                print("Detected Landscape orientation...")
                return 1
            else:
                print("Failed to detect orientation from device. " "Fallback to 0")
                return 0
        else:
            print(
                "Failed to detect Orientation. SurfaceOrientation" " key was not found"
            )
            return 0

    def set_device_id(self, device_id):
        """
        sets the internal device id to the :param
        :param device_id:
        :type device_id:
        :return:
        :rtype:
        """
        self._device_id = device_id

    def get_device_id(self):
        return self._device_id

    def get_screenshot(self):
        """
        Gets the screenshot using `adb screncap` command
        :return:
        :rtype:
        """
        print("Make sure your phone display is switched on.")
        uid = uuid.uuid4().hex  # generate a random seed
        print("Generating Screenshot: {}".format(uid))

        # capture screenshot using `adb screencap`
        print("Please wait. A full definition screenshot is being captured")

        # revise: is sdcard a good place to store files
        adb_screencap_process = self.adb.command(
            ["shell", "screencap", "-p", "/sdcard/{uid}.png".format(uid=uid)],
            device_id=self._device_id,
        )
        adb_screencap_process_ecode = adb_screencap_process.wait(500)
        if adb_screencap_process_ecode != 0:
            print("Screenshot failed. Exiting")
            print(adb_screencap_process.stdout.read().decode("utf-8"))
            return
        # sleep for two seconds so that the image is processed
        time.sleep(2)

        # pull screenshot from android using `adb pull`
        adb_pull_process = self.adb.command(
            [
                "pull",
                f"/sdcard/{uid}.png",
                "{dest}".format(dest=os.path.dirname(self.guiscrcpy_mapper_json)),
            ],
            device_id=self._device_id,
        )
        adb_pull_process_ecode = adb_pull_process.wait(500)

        if adb_pull_process_ecode != 0:
            print("Screenshot pull failed. Exiting")
            print(adb_pull_process.stdout.read().decode("utf-8"))
            return

        # sleep for 1 second to get capture time
        time.sleep(1)

        # remove data from user sdcard
        self.adb.command(
            ["shell", "rm", f"/sdcard/{uid}.png"], device_id=self._device_id
        )
        print(
            "[LOG] Screenshot captured. Saved to {cfgpath}".format(
                cfgpath=os.path.dirname(self.guiscrcpy_mapper_json)
            )
        )
        return os.path.join(
            os.path.dirname(self.guiscrcpy_mapper_json), "{uid}.png".format(uid=uid)
        )

    # The following functions handle key events on the mapper
    def on_key_press(self, key):
        try:
            if key.char in self.config.keys():
                print("[KEY] Hotkey command executing")
                position_to_tap = self.config.get(key.char)
                c = self.adb.command(
                    ["shell", "input", "tap", *[str(x) for x in position_to_tap]],
                    device_id=self.get_device_id(),
                )
                print(c.stdout.read().decode("utf-8"))
                print("[KEY][COMPLETE]")

        except AttributeError:
            if key == keyboard.Key.shift:
                print("[KEY][MOD] Shift key")
            elif key == keyboard.Key.esc:
                print("[KEY][ESC] Aborting")
                return False
            else:
                print("[KEY][MOD] Special key {}".format(key))

    def listen_keypress(self):
        """
        Listens to keypress using pynput and executes self.on_key_press
        on every keydown
        :return:
        :rtype:
        """
        print("[SERVER] LISTENING VALUES:" "Your keys are being listened by server. ")
        try:
            with keyboard.Listener(on_press=self.on_key_press) as listener:
                listener.join()
        except KeyboardInterrupt:
            print("guiscrcpy-mapper aborted on user request")

    # configuration
    def read_configuration(self):
        if not os.path.exists(self.guiscrcpy_mapper_json):
            self.create_configuration()
        with open(self.guiscrcpy_mapper_json, "r", encoding="utf-8") as f:
            self.config.update(json.load(f))

    def create_configuration(self):
        with open(self.guiscrcpy_mapper_json, "w") as w:
            json.dump(self.config, w)
        print("Wrote configuration file.")

    def add_position(self, char, position):
        self.config[char] = position

    # initialize
    def initialize(self, initialize_qt=False):
        """
        Initializes GUI to set keys to mapper
        :return:
        :rtype:
        """
        print("Setting up guiscrcpy-mapper for the first time use...")
        print("Intializing GUI window")
        if __name__ == "__main__" or initialize_qt:
            print("Creating QtCore window Application instance")
            self.app = QtWidgets.QApplication([])
        self.window = MapperUI(
            self,
            self.get_screenshot(),
            self.dimensions,
            fixed_pos=fixed_pos,
            final_pos=final_pos,
        )
        self.app.processEvents()
        self.app.exec_()


class MapperAsync(QThread):
    def __init__(self, parent, device_id, adb, initialize=True, config_path=None):
        QThread.__init__(self, parent)
        self.parent = parent
        self.adb = adb
        self.device_id = device_id
        self.initialize = initialize
        self._config_path = config_path

    def run(self):
        mp = Mapper(self.device_id, adb=self.adb, config_path=self._config_path)
        if self.initialize:
            mp.initialize(initialize_qt=False)
        else:
            mp.read_configuration()
            mp.listen_keypress()