endremborza/aswan

View on GitHub
aswan/connection_session.py

Summary

Maintainability
B
7 hrs
Test Coverage
import json
import multiprocessing as mp
import sys
import time
from dataclasses import dataclass
from functools import partial
from hashlib import md5
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Iterable, Optional

import requests
from atqo import ActorBase, SchedulerTask
from flask import Flask, make_response, request
from flask_cors import CORS
from selenium.common.exceptions import WebDriverException
from selenium.webdriver import Chrome
from structlog import get_logger

from .constants import HEADERS, WE_URL_K, WE_URL_ROUTE, WEBEXT_PORT, Statuses
from .depot import AswanDepot
from .exceptions import BrokenSessionError, ConnectionError
from .models import CollEvent, RegEvent
from .resources import Caps
from .security import DEFAULT_PROXY, ProxyBase
from .url_handler import ANY_HANDLER_T, RequestSoupHandler, WebExtHandler
from .utils import add_url_params

logger = get_logger()

EXCEPTION_STATUSES = {
    ConnectionError: Statuses.CONNECTION_ERROR,
    BrokenSessionError: Statuses.SESSION_BROKEN,
}
DEFAULT_EXCEPTION_STATUS = Statuses.PARSING_ERROR


@dataclass
class UrlHandlerResult:
    event: CollEvent
    registered_links: list[RegEvent]


@dataclass
class HandlingTask:
    handler: ANY_HANDLER_T
    url: str

    def get_scheduler_task(self) -> SchedulerTask:
        caps = self.handler.get_caps()
        return SchedulerTask(argument=self, requirements=caps)


class ConnectionSession(ActorBase):
    def __init__(
        self,
        depot_path: Optional[Path] = None,
        is_browser=False,
        headless=True,
        eager=False,
        proxy_cls: type[ProxyBase] = DEFAULT_PROXY,
        is_webext: bool = False,
    ):
        self.is_browser = is_browser
        self.eager = eager
        self._proxy = proxy_cls()
        if depot_path is not None:
            depot = AswanDepot(depot_path.name, depot_path.parent)
            self.current = depot.current.setup()
            self.store = depot.object_store
        else:
            self.current = None
            self.store = None

        self.session = (
            BrowserSession(headless, self.eager)
            if self.is_browser
            else (WebExtSession() if is_webext else RequestSession())
        )
        self._initiated_handlers = set()
        self._broken_handlers = set()
        self._num_queries = 0
        self.session.start(self._proxy)

    def consume(self, task: HandlingTask):
        handler_name = task.handler.name
        if (handler_name in self._broken_handlers) or (
            task.handler.restart_session_after < self._num_queries
        ):
            self._restart(new_proxy=False)

        if handler_name not in self._initiated_handlers:
            self._initiate_handler(task.handler)

        try:
            cached_resp = task.handler.load_cache(task.url)
        except Exception as e:
            logger.warning("error during cache loading", e=e, e_type=type(e))
            cached_resp = None
        if cached_resp is not None:
            status = (
                Statuses.PERSISTENT_CACHED
                if task.handler.process_indefinitely
                else Statuses.CACHE_LOADED
            )
            out = cached_resp
        else:
            task.handler.set_url(task.url)
            time.sleep(task.handler.get_sleep_time())
            out, status = self._get_out_and_status(task)
            if status == Statuses.SESSION_BROKEN:
                self._broken_handlers.add(handler_name)
            self._num_queries += 1
        self.proc_result(task, out, status)

    def stop(self):
        self.session.stop()

    def get_parsed_response(
        self, url, handler=RequestSoupHandler(), params: Optional[dict] = None
    ):
        if params:
            url = add_url_params(url, params)
        for attempt in range(handler.max_retries):
            try:
                content = self.session.get_response_content(handler, url)
                if not isinstance(content, int):
                    # int is non 200 response code
                    break
            except Exception as e:
                content = e
            self._log_miss("Failed Try", content, handler, attempt, "RETRY", url)
            if handler.is_session_broken(content):
                raise BrokenSessionError(f"error: {content}".split("\n")[0])
            time.sleep(handler.get_retry_sleep_time())
        else:
            raise ConnectionError(f"request resulted in error with status {content}")
        return handler.parse(handler.pre_parse(content))

    def proc_result(self, task: HandlingTask, out: Any, status: str):
        event = CollEvent(
            handler=task.handler.name,
            url=task.url,
            timestamp=int(time.time()),
            output_file=self.store.dump(out) if out is not None else "",
            status=status,
        )
        self.current.integrate_events([event, *task.handler.pop_registered_links()])

    def _restart(self, new_proxy=True):
        self.session.stop()
        if new_proxy:
            self._proxy.set_new_host()
        self._initiated_handlers = set()
        self._broken_handlers = set()
        self.session.start(self._proxy)
        self._num_queries = 0

    def _get_out_and_status(self, task: HandlingTask) -> tuple[Any, str]:
        try:
            out = self.get_parsed_response(task.url, task.handler)
            if task.handler.process_indefinitely:
                status = Statuses.PERSISTENT_PROCESSED
            else:
                status = Statuses.PROCESSED
        except Exception as e:
            out = _parse_exception(e)
            status = EXCEPTION_STATUSES.get(type(e), DEFAULT_EXCEPTION_STATUS)
            _h = task.handler
            self._log_miss("Gave Up", e, _h, _h.max_retries, status, task.url)
        return out, status

    def _initiate_handler(self, handler: ANY_HANDLER_T):
        for att in range(handler.initiation_retries):
            try:
                handler.start_session(self.session.driver)
                self._initiated_handlers.add(handler.name)
                return True
            except Exception as e:
                self._log_miss("Failed initiating handler", e, handler, att, "PRE", "")
                self._restart()
                time.sleep(handler.wait_on_initiation_fail)

    def _log_miss(self, msg, content, handler: ANY_HANDLER_T, attempt, status, url):
        out = _parse_exception(content)
        _info = out | {"proxy": self._proxy.host, "status": status}
        logger.warning(msg, handler=handler.name, url=url, attempt=attempt, **_info)


class BrowserSession:
    def __init__(self, headless: bool, eager: bool, show_images: bool = False):
        self._headless = headless
        self._eager = eager
        self._show_images = show_images
        self.driver: Optional[Chrome] = None

    def start(self, proxy: ProxyBase):
        chrome_options = proxy.get_chrome_options()
        if sys.platform == "linux":
            chrome_options.add_argument("--disable-dev-shm-usage")
            chrome_options.add_argument("--no-sandbox")
        if self._headless:
            chrome_options.add_argument("--disable-extensions")
            chrome_options.add_argument("--disable-gpu")
            chrome_options.add_argument("--headless")
        if self._eager:  # pragma: no cover
            chrome_options.page_load_strategy = "eager"
        if not self._show_images:
            prefs = {"profile.managed_default_content_settings.images": 2}
            chrome_options.add_experimental_option("prefs", prefs)

        logger.info(f"launching browser: {chrome_options.arguments}")
        self.driver = Chrome(options=chrome_options)
        logger.info("browser running")

    def stop(self):
        try:
            self.driver.close()
        except WebDriverException:  # pragma: no cover
            logger.warning("could not stop browser")

    def get_response_content(self, handler: ANY_HANDLER_T, url: str):
        self.driver.get(url)
        out = handler.handle_driver(self.driver)
        if out is not None:
            return out
        return self.driver.page_source.encode("utf-8")


class RequestSession:
    def __init__(self):
        self.driver: Optional[requests.Session] = None

    def start(self, proxy: ProxyBase):
        self.driver = requests.Session()
        self.driver.headers.update(HEADERS)
        self.driver.proxies.update(proxy.get_requests_dict())

    def stop(self):
        pass

    def get_response_content(self, handler: ANY_HANDLER_T, url: str):
        handler.handle_driver(self.driver)
        resp = self.driver.get(url)
        if resp.ok:
            return resp.content
        return resp.status_code


class WebExtSession:
    def __init__(self) -> None:
        self.tmpdir = TemporaryDirectory()
        self.manager = WebextAppManager(self.tmpdir)
        self.proc = mp.Process(target=self.manager.run)
        self.driver = None

    def start(self, _: ProxyBase):
        if not self.proc.is_alive():
            self.proc.start()

    def stop(self):
        self.proc.kill()
        self.tmpdir.cleanup()

    def get_response_content(self, handler: WebExtHandler, url: str):
        self.manager.url_path.write_text(url)
        fpath = self.manager.get_fpath(url)
        for _ in range(handler.max_retries):
            time.sleep(handler.wait_time)
            if fpath.exists():
                return fpath.read_bytes()
        raise ValueError(f"{fpath} for {url} not processed")


class WebextAppManager:
    def __init__(self, tmpdir: TemporaryDirectory) -> None:
        self.tmpdir = tmpdir
        self.url_path = Path(self.tmpdir.name, "url")
        self.url_path.write_text("")

    def handle_post(self):
        data = json.loads(request.data)
        url = data[WE_URL_K]
        self.get_fpath(url).write_bytes(request.data)
        return make_response("Page source saved successfully")

    def send_latest_url(self):
        for _ in range(10):
            latest_url = self.url_path.read_text()
            if self.get_fpath(latest_url).exists():
                time.sleep(3)
                continue
            return make_response(self.url_path.read_text())
        return make_response("https://myexternalip.com/json")

    def get_fpath(self, url: str):
        return Path(self.tmpdir.name, md5(url.encode()).hexdigest())

    def run(self):
        app = Flask(__name__)
        CORS(app)
        app.route("/", methods=["POST", "GET"])(self.handle_post)
        app.route(f"/{WE_URL_ROUTE}")(self.send_latest_url)
        app.run(host="0.0.0.0", port=WEBEXT_PORT)


cap_to_kwarg = {
    Caps.display: dict(headless=False),
    Caps.normal_browser: dict(is_browser=True),
    Caps.eager_browser: dict(is_browser=True, eager=True),
}


def get_actor_items(handlers: Iterable[ANY_HANDLER_T], depot_path: Path):
    # TODO this is extreme hacky
    for handler in handlers:
        caps = handler.get_caps()
        full_kwargs = dict(proxy_cls=handler.proxy_cls, depot_path=depot_path)
        for cap in caps:
            full_kwargs.update(cap_to_kwarg.get(cap, {}))
        if isinstance(handler, WebExtHandler):
            full_kwargs["is_webext"] = True
        if (
            full_kwargs.get("is_browser")
            and full_kwargs.get("headless", True)
            and handler.proxy.needs_auth
        ):
            raise RuntimeError("can't have auth (extension) in headless browser")
        yield caps, partial(ConnectionSession, **full_kwargs)


def _parse_exception(e):
    # tbl = [tb.strip().split("\n") for tb in traceback.format_tb(e.__traceback__)]
    return {"e_type": type(e).__name__, "e_msg": str(e).split("\n")[0]}  # , "tb": tbl}