Source code for



from __future__ import print_function

import os

from colorama import Back, Fore, init

from scrapple.commands import command
from scrapple.selectors.css import CssSelector
from scrapple.selectors.xpath import XpathSelector
from scrapple.utils.config import (InvalidConfigException, extract_fieldnames,
                                   traverse_next, validate_config)

[docs]class RunCommand(command.Command): """ Defines the execution of :ref:`run <command-run>` """ def __init__(self, args): super(RunCommand, self).__init__(args) init()
[docs] def execute_command(self): """ The run command implements the web content extractor corresponding to the given \ configuration file. The execute_command() validates the input project name and opens the JSON \ configuration file. The run() method handles the execution of the extractor run. The extractor implementation follows these primary steps : 1. Selects the appropriate :ref:`selector class <implementation-selectors>` through \ a dynamic dispatch, with the selector_type argument from the CLI input. #. Iterate through the data section in level-0 of the configuration file. \ On each data item, call the extract_content() method from the selector class to \ extract the content according to the specified extractor rule. #. If there are multiple levels of the extractor, i.e, if there is a 'next' \ attribute in the configuration file, call the traverse_next() \ :ref:`utility function <implementation-utils>` and parse through successive levels \ of the configuration file. #. According to the --output_type argument, the result data is saved in a JSON \ document or a CSV document. """ try: self.args['--verbosity'] = int(self.args['--verbosity']) if self.args['--verbosity'] not in [0, 1, 2]: raise ValueError if self.args['--verbosity'] > 0: print(Back.GREEN + Fore.BLACK + "Scrapple Run") print(Back.RESET + Fore.RESET) import json with open(self.args['<projectname>'] + '.json', 'r') as f: self.config = json.load(f) validate_config(self.config) except ValueError: print(Back.WHITE + Fore.RED + "Use 0, 1 or 2 for verbosity." \ + Back.RESET + Fore.RESET, sep="") except IOError: print(Back.WHITE + Fore.RED + self.args['<projectname>'], ".json does not ", \ "exist. Use ``scrapple genconfig``." + Back.RESET + Fore.RESET, sep="") except InvalidConfigException as e: print(Back.WHITE + Fore.RED + e + Back.RESET + Fore.RESET, sep="")
def run(self): selectorClassMapping = { 'xpath': XpathSelector, 'css': CssSelector } selectorClass = selectorClassMapping.get(self.config['selector_type'].lower()) results = dict() results['project'] = self.args['<projectname>'] results['data'] = list() try: result = dict() tabular_data_headers = dict() if self.args['--verbosity'] > 0: print() print(Back.YELLOW + Fore.BLUE + "Loading page ", self.config['scraping']['url'] \ + Back.RESET + Fore.RESET, end='') selector = selectorClass(self.config['scraping']['url']) for attribute in self.config['scraping']['data']: if attribute['field'] != "": if self.args['--verbosity'] > 1: print("\nExtracting", attribute['field'], "attribute", sep=' ', end='') result[attribute['field']] = selector.extract_content(**attribute) if not self.config['scraping'].get('table'): result_list = [result] else: tables = self.config['scraping'].get('table', []) for table in tables: if table.get('selector', '').strip() != '': table.update({ 'result': result, 'verbosity': self.args['--verbosity'] }) table_headers, result_list = selector.extract_tabular(**table) for th in table_headers: if not th in tabular_data_headers: tabular_data_headers[th] = len(tabular_data_headers) if not self.config['scraping'].get('next'): results['data'].extend(result_list) else: for nextx in self.config['scraping']['next']: for tdh, r in traverse_next(selector, nextx, result, verbosity=self.args['--verbosity']): results['data'].append(r) for th in tdh: if not th in tabular_data_headers: tabular_data_headers[th] = len(tabular_data_headers) except KeyboardInterrupt: pass except Exception as e: print(e) finally: if self.args['--output_type'] == 'json': import json with open(os.path.join(os.getcwd(), self.args['<output_filename>'] + '.json'), \ 'w') as f: json.dump(results, f, indent=4) elif self.args['--output_type'] == 'csv': import csv with open(os.path.join(os.getcwd(), self.args['<output_filename>'] + '.csv'), \ 'w') as f: fields = extract_fieldnames(self.config) data_headers = sorted(tabular_data_headers, key=lambda x:tabular_data_headers[x]) fields.extend(data_headers) writer = csv.DictWriter(f, fieldnames=fields) writer.writeheader() writer.writerows(results['data']) if self.args['--verbosity'] > 0: print() print(Back.WHITE + Fore.RED + self.args['<output_filename>'], \ ".", self.args['--output_type'], " has been created" \ + Back.RESET + Fore.RESET, sep="")