endremborza/aswan

View on GitHub
aswan/url_handler.py

Summary

Maintainability
A
45 mins
Test Coverage
import json
import random
from typing import TYPE_CHECKING, Iterable, List, Optional, Type, Union
from urllib.parse import urljoin

import requests
from atqo import Capability, CapabilitySet
from bs4 import BeautifulSoup, Tag
from structlog import get_logger

from .constants import WE_REG_K, WE_SOURCE_K
from .models import RegEvent
from .resources import Caps
from .security import DEFAULT_PROXY, ProxyBase
from .utils import add_url_params

if TYPE_CHECKING:
    from selenium.webdriver import Chrome  # pragma: no cover

logger = get_logger()


class UrlHandlerBase:
    test_urls: Iterable[str] = []
    url_root: Optional[str] = None
    proxy_cls: Type[ProxyBase] = DEFAULT_PROXY

    max_in_parallel: Optional[int] = None
    max_retries: int = 2
    initiation_retries: int = 2
    wait_on_initiation_fail: int = 20
    restart_session_after: int = 50
    process_indefinitely: bool = False

    def __init__(self):
        self.proxy = self.proxy_cls()
        self._registered_links = []
        self._url: Optional[str] = None

    def set_url(self, url):
        self._url = url

    def pre_parse(self, blob):
        return blob

    def load_cache(self, url):
        return None

    def register_links_to_handler(
        self,
        links: Iterable[str],
        handler_cls: Optional[Type["ANY_HANDLER_T"]] = None,
        overwrite: bool = False,
    ):
        if handler_cls is None:
            handler_cls = type(self)
        self._registered_links += [
            RegEvent(
                url=self.extend_link(link),
                handler=handler_cls.__name__,
                overwrite=overwrite,
            )
            for link in links
        ]

    def register_url_with_params(self, params: dict):
        parsed_params = {k: v for k, v in params.items() if v}
        if not parsed_params:
            return
        next_url = add_url_params(self._url, parsed_params)
        self.register_links_to_handler([next_url])

    def pop_registered_links(self) -> List["RegEvent"]:
        out = self._registered_links
        self._registered_links = []
        return out

    def get_caps(self):
        caps = [*self.proxy.caps]
        if isinstance(self, BrowserHandler):
            if not self.headless:  # pragma: no cover
                caps.append(Caps.display)
            if self.eager:  # pragma: no cover
                caps.append(Caps.eager_browser)  # how to test?
            else:
                caps.append(Caps.normal_browser)
        else:
            caps.append(Caps.simple)
        if self.max_in_parallel is not None:
            caps.append(Capability({self.name: 1}, name=f"{self.name}-cap"))
        return CapabilitySet(caps)

    @classmethod
    def extend_link(cls, link: str) -> str:
        if isinstance(link, Tag):
            link = link["href"]
        return urljoin(cls.url_root, link)

    @property
    def name(self):
        return type(self).__name__

    @staticmethod
    def get_retry_sleep_time():
        return random.uniform(0.1, 0.6)

    @staticmethod
    def get_sleep_time():
        return 0


class RequestHandler(UrlHandlerBase):
    def parse(self, blob):
        return blob

    def handle_driver(self, session: "requests.Session"):
        """runs before get. can set/update cookies/headers"""

    def start_session(self, session: "requests.Session"):
        """starts session if no browser needed"""

    def is_session_broken(self, result: Union[int, Exception]):
        """either response code, or exception"""
        # for determining to restart the session,
        # or proceed with handling connection error
        if isinstance(result, int):
            return result != 404
        return True


class BrowserHandler(UrlHandlerBase):
    headless: bool = False
    eager: bool = False

    def parse(self, source):
        return source

    def handle_driver(self, driver: "Chrome"):
        """runs after get. if returns something, that is forwarded to parse"""

    def start_session(self, browser: "Chrome"):
        """start session if browser is needed"""

    def is_session_broken(self, result: Exception):
        """exception when getting source

        if error occurs during handle browser, it comes here
        """
        return False


class WebExtHandler(UrlHandlerBase):
    max_in_parallel = 1
    wait_time = 1
    max_retries = 5
    restart_session_after = float("inf")

    def parse(self, we_resp: bytes):
        we_resp_dic: dict = json.loads(we_resp)
        self.register_links_to_handler(we_resp_dic.get(WE_REG_K, []))
        return we_resp_dic[WE_SOURCE_K]

    def handle_driver(self, app):
        pass

    def start_session(self, _):
        pass

    def is_session_broken(self, result: Union[int, Exception]):
        return False


ANY_HANDLER_T = Union[RequestHandler, BrowserHandler]


class _SoupMixin:
    def pre_parse(self, blob: bytes):
        from bs4 import BeautifulSoup

        return BeautifulSoup(blob, "html5lib")

    def parse(self, soup: "BeautifulSoup"):
        """parse bs4 soup"""
        return soup


class _JsonMixin:
    def pre_parse(self, blob: bytes):
        # except JSONDecodeError:
        # json.loads(BeautifulSoup(resp, "html5lib").text)
        return json.loads(blob)

    def parse(self, obj: dict):
        """if parses_json is true"""
        return obj


class RequestSoupHandler(_SoupMixin, RequestHandler):
    pass


class RequestJsonHandler(_JsonMixin, RequestHandler):
    pass


class BrowserSoupHandler(_SoupMixin, BrowserHandler):
    pass


class BrowserJsonHandler(_JsonMixin, BrowserHandler):
    pass