AlexMathew/scrapple

View on GitHub
scrapple/commands/run.py

Summary

Maintainability
D
1 day
Test Coverage
"""
scrapple.commands.run
~~~~~~~~~~~~~~~~~~~~~

"""

from __future__ import print_function

import os

from colorama import Back, Fore, init

from scrapple.commands import command
from scrapple.selectors.css import CssSelector
from scrapple.selectors.xpath import XpathSelector
from scrapple.utils.config import (InvalidConfigException, extract_fieldnames,
                                   traverse_next, validate_config)


class RunCommand(command.Command):
    """
    Defines the execution of :ref:`run <command-run>`
    """

    def __init__(self, args):
        super(RunCommand, self).__init__(args)
        init()

    def execute_command(self):
        """
        The run command implements the web content extractor corresponding to the given \
        configuration file. 

        The execute_command() validates the input project name and opens the JSON \
        configuration file. The run() method handles the execution of the extractor run.

        The extractor implementation follows these primary steps :

        1. Selects the appropriate :ref:`selector class <implementation-selectors>` through \
        a dynamic dispatch, with the selector_type argument from the CLI input. 

        #. Iterate through the data section in level-0 of the configuration file. \
        On each data item, call the extract_content() method from the selector class to \
        extract the content according to the specified extractor rule. 

        #. If there are multiple levels of the extractor, i.e, if there is a 'next' \
        attribute in the configuration file, call the traverse_next() \
        :ref:`utility function <implementation-utils>` and parse through successive levels \
        of the configuration file.

        #. According to the --output_type argument, the result data is saved in a JSON \
        document or a CSV document. 

        """
        try:
            self.args['--verbosity'] = int(self.args['--verbosity'])
            if self.args['--verbosity'] not in [0, 1, 2]:
                raise ValueError
            if self.args['--verbosity'] > 0:
                print(Back.GREEN + Fore.BLACK + "Scrapple Run")
                print(Back.RESET + Fore.RESET)
            import json
            with open(self.args['<projectname>'] + '.json', 'r') as f:
                self.config = json.load(f)
            validate_config(self.config)
            self.run()
        except ValueError:
            print(Back.WHITE + Fore.RED + "Use 0, 1 or 2 for verbosity." \
                + Back.RESET + Fore.RESET, sep="")
        except IOError:
            print(Back.WHITE + Fore.RED + self.args['<projectname>'], ".json does not ", \
                  "exist. Use ``scrapple genconfig``." + Back.RESET + Fore.RESET, sep="")
        except InvalidConfigException as e:
            print(Back.WHITE + Fore.RED + e + Back.RESET + Fore.RESET, sep="")


    def run(self):
        selectorClassMapping = {
            'xpath': XpathSelector,
            'css': CssSelector
        }
        selectorClass = selectorClassMapping.get(self.config['selector_type'].lower())
        results = dict()
        results['project'] = self.args['<projectname>']
        results['data'] = list()
        try:
            result = dict()
            tabular_data_headers = dict()
            if self.args['--verbosity'] > 0:
                print()
                print(Back.YELLOW + Fore.BLUE + "Loading page ", self.config['scraping']['url'] \
                    + Back.RESET + Fore.RESET, end='')
            selector = selectorClass(self.config['scraping']['url'])
            for attribute in self.config['scraping']['data']:
                if attribute['field'] != "":
                    if self.args['--verbosity'] > 1:
                        print("\nExtracting", attribute['field'], "attribute", sep=' ', end='')
                    result[attribute['field']] = selector.extract_content(**attribute)
            if not self.config['scraping'].get('table'):
                result_list = [result]
            else:
                tables = self.config['scraping'].get('table', [])
                for table in tables:
                    if table.get('selector', '').strip() != '':
                        table.update({
                            'result': result,
                            'verbosity': self.args['--verbosity']
                        })
                        table_headers, result_list = selector.extract_tabular(**table)
                        for th in table_headers:
                            if not th in tabular_data_headers:
                                tabular_data_headers[th] = len(tabular_data_headers)
            if not self.config['scraping'].get('next'):
                results['data'].extend(result_list)
            else:
                for nextx in self.config['scraping']['next']:
                    for tdh, r in traverse_next(selector, nextx, result, verbosity=self.args['--verbosity']):
                        results['data'].append(r)
                        for th in tdh:
                            if not th in tabular_data_headers:
                                tabular_data_headers[th] = len(tabular_data_headers)
        except KeyboardInterrupt:
            pass
        except Exception as e:
            print(e)
        finally:
            if self.args['--output_type'] == 'json':
                import json
                with open(os.path.join(os.getcwd(), self.args['<output_filename>'] + '.json'), \
                    'w') as f:
                    json.dump(results, f, indent=4)
            elif self.args['--output_type'] == 'csv':
                import csv
                with open(os.path.join(os.getcwd(), self.args['<output_filename>'] + '.csv'), \
                    'w') as f:
                    fields = extract_fieldnames(self.config)
                    data_headers = sorted(tabular_data_headers, key=lambda x:tabular_data_headers[x])
                    fields.extend(data_headers)
                    writer = csv.DictWriter(f, fieldnames=fields)
                    writer.writeheader()
                    writer.writerows(results['data'])
            if self.args['--verbosity'] > 0:        
                print()
                print(Back.WHITE + Fore.RED + self.args['<output_filename>'], \
                      ".", self.args['--output_type'], " has been created" \
                      + Back.RESET + Fore.RESET, sep="")