Konano/arknights-mower

View on GitHub
arknights_mower/utils/device/minitouch/core.py

Summary

Maintainability
A
3 hrs
Test Coverage
from __future__ import annotations

import os
import time
# import random
from typing import Union

from ... import config
from ...log import log_sync, logger
from ..adb_client import ADBClient
from ..utils import download_file
from .command import CommandBuilder
from .session import Session

# MNT_PREBUILT_URL = 'https://github.com/williamfzc/stf-binaries/raw/master/node_modules/minitouch-prebuilt/prebuilt'
MNT_PREBUILT_URL = 'https://oss.nano.ac/arknights_mower/minitouch'
MNT_PATH = '/data/local/tmp/minitouch'


class Client(object):
    """ Use minitouch to control Android devices easily """

    def __init__(self, client: ADBClient, touch_device: str = config.MNT_TOUCH_DEVICE) -> None:
        self.client = client
        self.touch_device = touch_device
        self.process = None
        self.start()

    def start(self) -> None:
        self.__install()
        self.__server()

    def __del__(self) -> None:
        self.stop()

    def stop(self) -> None:
        self.__server_stop()

    def __install(self) -> None:
        """ install minitouch for android devices """
        self.abi = self.__get_abi()
        if self.__is_mnt_existed():
            logger.debug(
                f'minitouch already existed in {self.client.device_id}')
        else:
            self.__download_mnt()

    def __get_abi(self) -> str:
        """ query device ABI """
        abi = self.client.cmd_shell('getprop ro.product.cpu.abi', True).strip()
        logger.debug(f'device_abi: {abi}')
        return abi

    def __is_mnt_existed(self) -> bool:
        """ check if minitouch is existed in the device """
        file_list = self.client.cmd_shell('ls /data/local/tmp', True)
        return 'minitouch' in file_list

    def __download_mnt(self) -> None:
        """ download minitouch """
        url = f'{MNT_PREBUILT_URL}/{self.abi}/bin/minitouch'
        logger.info(f'minitouch url: {url}')
        mnt_path = download_file(url)

        # push and grant
        self.client.cmd_push(mnt_path, MNT_PATH)
        self.client.cmd_shell(f'chmod 777 {MNT_PATH}')
        logger.info('minitouch already installed in {MNT_PATH}')

        # remove temp
        os.remove(mnt_path)

    def __server(self) -> None:
        """ execute minitouch with adb shell """
        # self.port = self.__get_port()
        self.port = config.MNT_PORT
        self.__forward_port()
        self.process = None
        r, self.stderr = os.pipe()
        log_sync('minitouch', r).start()
        self.__start_mnt()

        # make sure minitouch is up
        time.sleep(1)
        if not self.check_mnt_alive(False):
            raise RuntimeError('minitouch did not work. see https://github.com/Konano/arknights-mower/issues/82')

    def __server_stop(self) -> None:
        """ stop minitouch """
        self.process and self.process.kill()

    # def __get_port(cls) -> int:
    #     """ get a random port from port set """
    #     while True:
    #         port = random.choice(list(range(20000, 21000)))
    #         if is_port_using(DEFAULT_HOST, port):
    #             return port

    def __forward_port(self) -> None:
        """ allow pc access minitouch with port """
        output = self.client.cmd(
            f'forward tcp:{self.port} localabstract:minitouch')
        logger.debug(f'output: {output}')

    def __start_mnt(self) -> None:
        """ fork a process to start minitouch on android """
        if self.touch_device is None:
            self.process = self.client.process('/data/local/tmp/minitouch', [], self.stderr)
        else:
            self.process = self.client.process('/data/local/tmp/minitouch', ['-d', self.touch_device], self.stderr)

    def check_mnt_alive(self, restart: bool = True) -> bool:
        """ check if minitouch process alive """
        if self.process and self.process.poll() is None:
            return True
        elif restart:
            self.__server_stop()
            self.__forward_port()
            self.__start_mnt()
            time.sleep(1)
            if not (self.process and self.process.poll() is None):
                raise RuntimeError('minitouch did not work. see https://github.com/Konano/arknights-mower/issues/82')
            return True
        return False

    def check_adb_alive(self) -> bool:
        """ check if adb server alive """
        return self.client.check_server_alive()

    def convert_coordinate(self, point: tuple[int, int], display_frames: tuple[int, int, int], max_x: int, max_y: int) -> tuple[int, int]:
        """
        check compatibility mode and convert coordinate
        see details: https://github.com/Konano/arknights-mower/issues/85
        """
        if not config.MNT_COMPATIBILITY_MODE:
            return point
        x, y = point
        w, h, r = display_frames
        if r == 1:
            return [(h - y) * max_x // h, x * max_y // w]
        if r == 3:
            return [y * max_x // h, (w - x) * max_y // w]
        logger.debug(f'warning: unexpected rotation parameter: display_frames({w}, {h}, {r})')
        return point

    def tap(self, points: list[tuple[int, int]], display_frames: tuple[int, int, int], pressure: int = 100, duration: int = None, lift: bool = True) -> None:
        """
        tap on screen with pressure and duration

        :param points: list[int], look like [(x1, y1), (x2, y2), ...]
        :param display_frames: tuple[int, int, int], which means [weight, high, rotation] by "adb shell dumpsys window | grep DisplayFrames"
        :param pressure: default to 100
        :param duration: in milliseconds
        :param lift: if True, "lift" the touch point
        """
        self.check_adb_alive()
        self.check_mnt_alive()

        builder = CommandBuilder()
        points = [list(map(int, point)) for point in points]
        with Session(self.port) as conn:
            for id, point in enumerate(points):
                x, y = self.convert_coordinate(point, display_frames, int(conn.max_x), int(conn.max_y))
                builder.down(id, x, y, pressure)
            builder.commit()

            if duration:
                builder.wait(duration)
                builder.commit()

            if lift:
                for id in range(len(points)):
                    builder.up(id)

            builder.publish(conn)

    def __swipe(self, points: list[tuple[int, int]], display_frames: tuple[int, int, int], pressure: int = 100, duration: Union[list[int], int] = None, up_wait: int = 0, fall: bool = True, lift: bool = True) -> None:
        """
        swipe between points one by one, with pressure and duration

        :param points: list, look like [(x1, y1), (x2, y2), ...]
        :param display_frames: tuple[int, int, int], which means [weight, high, rotation] by "adb shell dumpsys window | grep DisplayFrames"
        :param pressure: default to 100
        :param duration: in milliseconds
        :param up_wait: in milliseconds
        :param fall: if True, "fall" the first touch point
        :param lift: if True, "lift" the last touch point
        """
        self.check_adb_alive()
        self.check_mnt_alive()

        points = [list(map(int, point)) for point in points]
        if not isinstance(duration, list):
            duration = [duration] * (len(points) - 1)
        assert len(duration) + 1 == len(points)

        builder = CommandBuilder()
        with Session(self.port) as conn:
            if fall:
                x, y = self.convert_coordinate(points[0], display_frames, int(conn.max_x), int(conn.max_y))
                builder.down(0, x, y, pressure)
                builder.publish(conn)

            for idx, point in enumerate(points[1:]):
                x, y = self.convert_coordinate(point, display_frames, int(conn.max_x), int(conn.max_y))
                builder.move(0, x, y, pressure)
                if duration[idx-1]:
                    builder.wait(duration[idx-1])
                builder.commit()
            builder.publish(conn)

            if lift:
                builder.up(0)
                if up_wait:
                    builder.wait(up_wait)
                builder.publish(conn)

    def swipe(self, points: list[tuple[int, int]], display_frames: tuple[int, int, int], pressure: int = 100, duration: Union[list[int], int] = None, up_wait: int = 0, part: int = 10, fall: bool = True, lift: bool = True) -> None:
        """
        swipe between points one by one, with pressure and duration
        it will split distance between points into pieces

        :param points: list, look like [(x1, y1), (x2, y2), ...]
        :param display_frames: tuple[int, int, int], which means [weight, high, rotation] by "adb shell dumpsys window | grep DisplayFrames"
        :param pressure: default to 100
        :param duration: in milliseconds
        :param up_wait: in milliseconds
        :param part: default to 10
        :param fall: if True, "fall" the first touch point
        :param lift: if True, "lift" the last touch point
        """
        points = [list(map(int, point)) for point in points]
        if not isinstance(duration, list):
            duration = [duration] * (len(points) - 1)
        assert len(duration) + 1 == len(points)
        
        new_points = [points[0]]
        new_duration = []
        for id in range(1, len(points)):
            pre_point = points[id-1]
            cur_point = points[id]
            offset = (
                (cur_point[0] - pre_point[0]) // part,
                (cur_point[1] - pre_point[1]) // part,
            )
            new_points += [
                (pre_point[0] + i * offset[0], pre_point[1] + i * offset[1])
                for i in range(1, part+1)
            ]
            if duration[id-1] is None:
                new_duration += [None] * part
            else:
                new_duration += [duration[id-1] // part] * part
        self.__swipe(new_points, display_frames, pressure, new_duration, up_wait, fall, lift)