Nekmo/dirhunt

View on GitHub
dirhunt/processors.py

Summary

Maintainability
D
1 day
Test Coverage
# -*- coding: utf-8 -*-
import re

from bs4 import Comment
from colorama import Fore, Back

from dirhunt.colors import status_code_colors
from dirhunt.crawler_url import CrawlerUrl
from dirhunt.url import Url
from dirhunt.utils import colored

INDEX_FILES = ['index.php', 'index.html', 'index.html']
ACCEPTED_PROTOCOLS = ['http', 'https']


def full_url_address(address, url):
    """

    :type url: Url
    :type address: str
    :rtype :Url

    """
    if address is None:
        return
    protocol_match = address.split(':', 1)[0] if ':' in address else ''
    protocol_match = re.match('^([A-z0-9\\-]+)$', protocol_match)
    if protocol_match and protocol_match.group(1) not in ACCEPTED_PROTOCOLS:
        return
    # TODO: mejorar esto. Aceptar otros protocolos  a rechazar
    if address.startswith('//'):
        address = address.replace('//', '{}://'.format(url.protocol), 1)
    if '://' not in address or address.startswith('/'):
        url = url.copy()
        url.path = address
        return url
    url = Url(address)
    if url.is_valid():
        return url


class ProcessBase(object):
    name = ''
    key_name = ''
    index_file = None
    status_code = 0

    def __init__(self, response, crawler_url):
        """

        :type crawler_url: CrawlerUrl
        """
        # TODO: hay que pensar en no pasar response, text y soup por aquí para establecerlo en self,
        # para no llenar la memoria. Deben ser cosas "volátiles".
        if response is not None:
            self.status_code = response.status_code
        # TODO: procesar otras cosas (css, etc.)
        self.crawler_url = crawler_url

    def search_index_files(self):
        if self.crawler_url.type not in ['directory', None]:
            return
        crawler = self.crawler_url.crawler
        for index_file in INDEX_FILES:
            url = self.crawler_url.url.copy()
            url.set_children(index_file)
            future = self.crawler_url.crawler.add_url(
                CrawlerUrl(crawler, url, self.crawler_url.depth - 1, self, None, 'document',
                           timeout=self.crawler_url.timeout), True)
            if self.crawler_url.crawler.closing:
                return
            result = future.result()
            if result.exists:
                self.index_file = url
                break

    @classmethod
    def is_applicable(cls, request, text, crawler_url, soup):
        raise NotImplementedError

    def process(self, text, soup=None):
        raise NotImplementedError

    @property
    def flags(self):
        return {self.key_name}

    def maybe_directory(self):
        return self.crawler_url.maybe_directory()

    def url_line(self):
        body = colored('[{}]'.format(self.status_code), status_code_colors(self.status_code))
        body += ' {} '.format(self.crawler_url.url.url)
        body += colored(' ({})'.format(self.name or self.__class__.__name__), Fore.LIGHTYELLOW_EX)
        return body

    def __str__(self):
        body = self.url_line()
        if self.index_file:
            body += colored('\n    Index file found: ', Fore.BLUE)
            body += '{}'.format(self.index_file.name)
        return body


class Error(ProcessBase):

    name = 'Error'
    key_name = 'error'

    def __init__(self, crawler_url, error):
        super(Error, self).__init__(None, crawler_url)
        self.error = error

    def process(self, text, soup=None):
        pass

    def __str__(self):
        body = colored('[ERROR]', Back.LIGHTRED_EX, Fore.LIGHTWHITE_EX)
        body += ' {} '.format(self.crawler_url.url.url)
        body += colored('({})'.format(self.error), Fore.LIGHTYELLOW_EX)
        return body

    @classmethod
    def is_applicable(cls, request, text, crawler_url, soup):
        pass


class GenericProcessor(ProcessBase):
    name = 'Generic'
    key_name = 'generic'

    def process(self, text, soup=None):
        self.search_index_files()


class ProcessRedirect(ProcessBase):
    name = 'Redirect'
    key_name = 'redirect'
    redirector = None

    def __init__(self, response, crawler_url):
        super(ProcessRedirect, self).__init__(response, crawler_url)
        self.redirector = full_url_address(response.headers.get('Location'), self.crawler_url.url)

    def process(self, text, soup=None):
        self.crawler_url.crawler.add_url(CrawlerUrl(self.crawler_url.crawler, self.redirector, 3, self.crawler_url,
                                                    timeout=self.crawler_url.timeout))

    @classmethod
    def is_applicable(cls, request, text, crawler_url, soup):
        return 300 <= request.status_code < 400

    def __str__(self):
        body = super(ProcessRedirect, self).__str__()
        body += colored('\n    Redirect to: ', Fore.BLUE)
        body += '{}'.format(self.redirector.address)
        return body


class ProcessNotFound(ProcessBase):
    name = 'Not Found'
    key_name = 'not_found'

    def process(self, text, soup=None):
        self.search_index_files()

    @classmethod
    def is_applicable(cls, request, text, crawler_url, soup):
        return request.status_code == 404

    def __str__(self):
        body = self.url_line()
        if self.crawler_url.exists:
            body += colored(' (FAKE 404)', Fore.YELLOW)
        if self.index_file:
            body += '\n    Index file found: {}'.format(self.index_file.name)
        return body

    @property
    def flags(self):
        flags = super(ProcessNotFound, self).flags
        if self.crawler_url.exists:
            flags.update({'{}.fake'.format(self.key_name)})
        return flags


class ProcessHtmlRequest(ProcessBase):
    name = 'HTML document'
    key_name = 'html'

    def process(self, text, soup=None):
        self.assets(soup)
        self.links(soup)
        self.search_index_files()

    def links(self, soup):
        links = [full_url_address(link.attrs.get('href'), self.crawler_url.url)
                 for link in soup.find_all('a')]
        for link in filter(bool, links):
            url = Url(link)
            if not url.is_valid():
                continue
            depth = self.crawler_url.depth
            if url.domain != self.crawler_url.url.domain or \
                    not url.path.startswith(self.crawler_url.url.directory_path):
                depth -= 1
            if depth <= 0:
                continue
            self.crawler_url.crawler.add_url(CrawlerUrl(self.crawler_url.crawler, link, depth, self.crawler_url,
                                                        timeout=self.crawler_url.timeout))

    def assets(self, soup):
        assets = [full_url_address(link.attrs.get('href'), self.crawler_url.url)
                   for link in soup.find_all('link')]
        assets += [full_url_address(script.attrs.get('src'), self.crawler_url.url)
                   for script in soup.find_all('script')]
        assets += [full_url_address(img.attrs.get('src'), self.crawler_url.url)
                   for img in soup.find_all('img')]
        for asset in filter(bool, assets):
            self.analyze_asset(asset)
            self.crawler_url.crawler.add_url(CrawlerUrl(self.crawler_url.crawler, asset, 3, self.crawler_url,
                                                        type='asset', timeout=self.crawler_url.timeout))

    def analyze_asset(self, asset):
        """

        :type asset: Url
        """
        if 'wordpress' not in self.crawler_url.flags and 'wp-content' in asset.path:
            self.crawler_url.flags.update({'wordpress'})
            # Override type always except for root path
            self.crawler_url.type = 'rewrite' if self.crawler_url.type != 'directory' else 'directory'
            self.crawler_url.depth -= 1

    @classmethod
    def is_applicable(cls, response, text, crawler_url, soup):
        return response.headers.get('Content-Type', '').lower().startswith('text/html') and response.status_code < 300 \
               and soup is not None


class ProcessIndexOfRequest(ProcessHtmlRequest):
    name = 'Index Of'
    key_name = 'index_of'
    files = None
    index_titles = ('index of', 'directory listing for')

    def process(self, text, soup=None):
        links = [full_url_address(link.attrs.get('href'), self.crawler_url.url)
                   for link in soup.find_all('a')]
        for link in filter(lambda x: x.url.endswith('/'), links):
            self.crawler_url.crawler.add_url(CrawlerUrl(self.crawler_url.crawler, link, 3, self.crawler_url,
                                                        type='directory', timeout=self.crawler_url.timeout))
        self.files = [Url(link) for link in links]

    def interesting_ext_files(self):
        return filter(lambda x: x.name.split('.')[-1] in self.crawler_url.crawler.interesting_extensions, self.files)

    def interesting_name_files(self):
        return filter(lambda x: x.name in self.crawler_url.crawler.interesting_files, self.files)

    def interesting_files(self):
        for iterator in [self.interesting_ext_files(), self.interesting_name_files()]:
            for file in iterator:
                yield file

    def __str__(self):
        body = super(ProcessIndexOfRequest, self).__str__()
        ext_files = list(self.interesting_ext_files())
        name_files = list(self.interesting_name_files())
        if ext_files:
            body += colored('\n    Interesting extension files:', Fore.BLUE)
            body += ' {}'.format(', '.join(map(lambda x: x.name, ext_files)))
        if name_files:
            body += colored('\n    Interesting file names:', Fore.MAGENTA)
            body += ' {}'.format(', '.join(map(lambda x: x.name, name_files)))
        if not ext_files and not name_files:
            body += colored(' (Nothing interesting)', Fore.LIGHTYELLOW_EX)
        return body

    @classmethod
    def is_applicable(cls, response, text, crawler_url, soup):
        if not super(ProcessIndexOfRequest, cls).is_applicable(response, text, crawler_url, soup):
            return False
        title = soup.find('title')
        if not title:
            return False
        title = title.text.lower()
        for index_title in cls.index_titles:
            if title.startswith(index_title):
                return True
        return False

    @property
    def flags(self):
        flags = super(ProcessHtmlRequest, self).flags
        try:
            next(self.interesting_files())
        except StopIteration:
            flags.update({'{}.nothing'.format(self.key_name)})
        return flags


class ProcessBlankPageRequest(ProcessHtmlRequest):
    name = 'Blank page'
    key_name = 'blank'

    @classmethod
    def is_applicable(cls, response, text, crawler_url, soup):
        if not super(ProcessBlankPageRequest, cls).is_applicable(response, text, crawler_url, soup):
            return False

        def tag_visible(element):
            if element.parent.name in ['style', 'script', 'head', 'title', 'meta', '[document]']:
                return False
            if isinstance(element, Comment):
                return False
            return True
        texts = soup.findAll(text=True)
        visible_texts = filter(tag_visible, texts)
        for text in visible_texts:
            if text.strip():
                return False
        return True


def get_processor(response, text, crawler_url, soup):
    for processor_class in PROCESSORS:
        if processor_class.is_applicable(response, text, crawler_url, soup):
            # TODO: resp por None
            return processor_class(response, crawler_url)


PROCESSORS = [
    ProcessRedirect,
    ProcessNotFound,
    ProcessIndexOfRequest,
    ProcessBlankPageRequest,
    ProcessHtmlRequest,
]