okfn-brasil/serenata-de-amor

View on GitHub
research/src/fetch_sex_places.py

Summary

Maintainability
B
5 hrs
Test Coverage
import asyncio
import json
import logging
import math
import os
import sys
from argparse import ArgumentParser
from concurrent.futures import CancelledError
from csv import DictWriter
from datetime import date
from io import StringIO
from itertools import chain
from pathlib import Path
from urllib.parse import urlencode

import aiofiles
import aiohttp
import pandas as pd
import numpy as np
from geopy.distance import vincenty


DTYPE = dict(cnpj=np.str, cnpj_cpf=np.str)
LOG_FORMAT = '[%(levelname)s] %(asctime)s: %(message)s'

logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=LOG_FORMAT)


class GooglePlacesURL:

    BASE_URL = 'https://maps.googleapis.com/maps/api/place/'

    def __init__(self, key):
        self.key = key

    def url(self, endpoint, query=None, format='json'):
        """
        :param endpoint: (str) Google Places API endpoint name (e.g. details)
        :param query: (tuple) tuples with key/values pairs for the URL query
        :param format: (str) output format (default is `json`)
        :return: (str) URL to do an authenticated Google Places request
        """
        key = ('key', self.key)
        query = tuple(chain(query, (key,))) if query else (key)
        parts = (
            self.BASE_URL,
            endpoint,
            '/{}?'.format(format),
            urlencode(query)
        )
        return ''.join(parts)

    def details(self, place):
        """
        :param place: (int or str) ID of the place in Google Place
        :return: (str) URL to do a place details Google Places search
        """
        query = (('placeid', place),)
        return self.url('details', query)

    def nearby(self, keyword, location):
        """
        :param keywork: (str) category to search places
        :return: (str) URL to do a nearby Google Places search
        """
        query = (
            ('location', location),
            ('keyword', keyword),
            ('rankby', 'distance'),
        )
        return self.url('nearbysearch', query)


class SexPlacesNearBy:

    KEYWORDS = ('acompanhantes',
                'adult entertainment club',
                'adult entertainment store',
                'gay sauna',
                'massagem',
                'modeling agency',
                'motel',
                'night club',
                'sex club',
                'sex shop',
                'strip club',
                'swinger clubs')

    def __init__(self, company, key=None):
        """
        :param company: (dict) Company with name, cnpj, latitude and longitude
        :param key: (str) Google Places API key
        """
        self.url = GooglePlacesURL(key or config('GOOGLE_API_KEY'))

        self.company = company
        self.latitude = self.company['latitude']
        self.longitude = self.company['longitude']
        self.places = []
        self.closest = None

    @property
    def company_name(self):
        return self.company.get('trade_name') or self.company.get('name')

    @property
    def valid(self):
        try:
            coords = map(float, (self.latitude, self.longitude))
        except ValueError:
            return False

        if any(map(math.isnan, coords)):
            return False

        return True

    async def get_closest(self):
        """
        Start the requests to store a place per self.KEYWORD in self.places.
        Then gets the closest place found, queries for its details and returns
        a dict with the details for that place.
        """
        if not self.valid:
            msg = 'No geolocation information for company: {} ({})'
            logging.info(msg.format(self.company_name, self.company['cnpj']))
            return None

        tasks = [self.load_place(k) for k in self.KEYWORDS]
        await asyncio.gather(*tasks)
        ordered = sorted(self.places, key=lambda x: x['distance'])

        for place in ordered:
            place = await self.load_details(place)
            name = place.get('name', '').lower()

            if place.get('keyword') == 'motel' and 'hotel' in name:
                pass  # google returns hotels when looking for a motel

            else:
                prefix = '💋 ' if place.get('distance') < 5 else ''
                msg = '{}Found something interesting {:.2f}m away from {}…'
                args = (prefix, place.get('distance'), self.company_name)
                logging.info(msg.format(*args))
                self.closest = place
                return place

    async def load_place(self, keyword, print_status=False):
        """
        Given a keyword it loads the place returned by the API to self.places.
        """
        if print_status:
            msg = 'Looking for a {} near {} ({})…'
            args = (keyword, self.company_name, self.company.get('cnpj'))
            logging.info(msg.format(*args))


        location = '{},{}'.format(self.latitude, self.longitude)
        url = self.url.nearby(keyword, location)
        try:
            response = await aiohttp.request('GET', url)
        except aiohttp.TimeoutError:
            logging.info('Timeout raised for {}'.format(url))
        else:
            content = await response.text()
            place = self.parse(keyword, content)
            if place and isinstance(place.get('distance'), float):
                self.places.append(place)

    def parse(self, keyword, content):
        """
        Return a dictionary with information of the nearest sex place
        around a given company.
        :param keyword: (str) keyword used by the request
        :param content: (str) content of the response to the request
        :return: (dict) with
            * name : The name of nearest sex place
            * latitude : The latitude of nearest sex place
            * longitude : The longitude of nearest sex place
            * distance : Distance (in meters) between the company and the
              nearest sex place
            * address : The address of the nearest sex place
            * phone : The phone of the nearest sex place
            * id : The Google Place ID of the nearest sex place
            * keyword : term that matched the sex place in Google Place Search

        Google responses:
            * `OK` indicates that no errors occurred; the place was
              successfully detected and at least one result was returned.
            * `UNKNOWN_ERROR` indicates a server-side error; trying again may
              be successful.
            * `ZERO_RESULTS` indicates that the reference was valid but no
              longer refers to a valid result. This may occur if the
              establishment is no longer in business.
            * `OVER_QUERY_LIMIT` indicates that you are over your quota.
            * `REQUEST_DENIED` indicates that your request was denied,
              generally because of lack of an invalid key parameter.
            * `INVALID_REQUEST` generally indicates that the query (reference)
              is missing.
            * `NOT_FOUND` indicates that the referenced location was not found
              in the Places database.<Paste>

        Source: https://developers.googlefetchplaces/web-service/details
        """
        response = json.loads(content)
        status = response.get('status')

        if status != 'OK':
            if status in ('OVER_QUERY_LIMIT', 'REQUEST_DENIED'):
                shutdown()  # reached API limit or API key is missing/wrong

            if status != 'ZERO_RESULTS':
                error = response.get('error', '')
                msg = 'Google Places API Status: {} {}'.format(status, error)
                logging.info(msg.strip())

            return None

        place, *_ = response.get('results', [{}])

        location = place.get('geometry', {}).get('location', {})
        latitude = float(location.get('lat'))
        longitude = float(location.get('lng'))

        company_location = (self.latitude, self.longitude)
        place_location = (latitude, longitude)
        distance = vincenty(company_location, place_location)

        return {
            'id': place.get('place_id'),
            'keyword': keyword,
            'latitude': latitude,
            'longitude': longitude,
            'distance': distance.meters,
            'cnpj': self.company.get('cnpj'),
            'company_name': self.company.get('name'),
            'company_trade_name': self.company.get('trade_name')
        }

    async def load_details(self, place):
        """
        :param place: dictionary with id key.
        :return: dictionary updated with name, address and phone.
        """
        place_id = place.get('id')
        if not place_id:
            return place

        # request place details
        try:
            response = await aiohttp.request('GET', self.url.details(place_id))
        except aiohttp.TimeoutError:
            logging.info('Timeout raised for {}'.format(url))
            return place
        else:
            content = await response.text()

        # parse place details
        try:
            details = json.loads(content)
        except ValueError:
            return place
        else:
            if not details:
                return place

        result = details.get('result', {})
        place.update(dict(
            name=result.get('name', ''),
            address=result.get('formatted_address', ''),
            phone=result.get('formatted_phone_number', '')
        ))
        return place


async def write_to_csv(path, place=None, **kwargs):
    """
    Receives a given place (dict) and writes it in the CSV format into path.
    CSV headers are defined in `fields`. The named argument `headers`
    (bool) determines if the functions write the CSV header or not.
    """
    headers = kwargs.get('headers', False)
    if not place and not headers:
        return

    fields = (
        'id', 'keyword', 'latitude', 'longitude', 'distance', 'name',
        'address', 'phone', 'cnpj', 'company_name', 'company_trade_name'
    )

    with StringIO() as obj:
        writer = DictWriter(obj, fieldnames=fields, extrasaction='ignore')
        if headers:
            writer.writeheader()
        if place:
            writer.writerow(place)

        async with aiofiles.open(path, mode='a') as fh:
            await fh.write(obj.getvalue())


async def fetch_place(company, output, semaphore):
    """
    Gets a company (dict), finds the closest place nearby and write the result
    to a CSV file.
    """
    with (await semaphore):
        places = SexPlacesNearBy(company)
        await places.get_closest()
        if places.closest:
            await write_to_csv(output, places.closest)


async def main_coro(companies, output, max_requests):
    """
    :param companies: (Pandas DataFrame)
    :param output: (str) Path to the CSV output
    :param max_requests: (int) max parallel requests
    """
    # write CSV headers
    if is_new_dataset(output) and not companies.empty:
        await write_to_csv(output, headers=True)

    semaphore = asyncio.Semaphore(max_requests // 13)  # 13 reqs per company
    tasks = []
    logging.info("Let's get started!")

    # write CSV data
    for company_row in companies.itertuples(index=True):
        company = dict(company_row._asdict())  # _asdict() returns OrderedDict
        tasks.append(fetch_place(company, output, semaphore))

    await asyncio.wait(tasks)


def find_newest_file(pattern='*.*', source_dir='.'):
    """
    Assuming that the files will be in the form of:
    yyyy-mm-dd-type-of-file.xz we can try to find the newest file
    based on the date.
    """
    files = sorted(Path(source_dir).glob(pattern))
    if not files:
        return None

    file = files.pop()
    if not file:
        return None

    return str(file)


def load_newest_dataset(pattern, usecols, na_value=''):
    filepath = find_newest_file(pattern)
    if not filepath:
        return None

    logging.info('Loading {}'.format(filepath))
    dataset = pd.read_csv(
        filepath,
        dtype=DTYPE,
        low_memory=False,
        usecols=usecols
    )
    dataset = dataset.fillna(value=na_value)
    return dataset


def get_companies(companies_path, **kwargs):
    """
    Compares YYYY-MM-DD-companies.xz with the newest
    YYYY-MM-DD-sex-place-distances.xz and returns a DataFrame with only
    the rows matching the search criteria, excluding already fetched companies.

    Keyword arguments are expected: term (int), value (float) and city (str)
    """
    filters = tuple(map(kwargs.get, ('term', 'value', 'city')))
    if not all(filters):
        raise TypeError('get_companies expects term, value and city as kwargs')
    term, value, city = filters

    # load companies
    cols = ('cnpj', 'trade_name', 'name', 'latitude', 'longitude', 'city')
    companies = load_newest_dataset(companies_path, cols)
    companies['cnpj'] = companies['cnpj'].str.replace(r'\D', '')

    # load & fiter reimbursements
    cols = ('total_net_value', 'cnpj_cpf', 'term')
    reimbursements = load_newest_dataset('data/*-reimbursements.xz', cols)
    query = '(term == {}) & (total_net_value >= {})'.format(term, value)
    reimbursements = reimbursements.query(query)

    # load & filter companies
    on = dict(left_on='cnpj', right_on='cnpj_cpf')
    companies = pd.merge(companies, reimbursements, **on)
    del(reimbursements)
    companies.drop_duplicates('cnpj', inplace=True)
    query = 'city.str.upper() == "{}"'.format(city.upper())
    companies = companies.query(query)

    # clean up companies
    del(companies['cnpj_cpf'])
    del(companies['term'])
    del(companies['total_net_value'])
    del(companies['city'])

    # load sexplaces & filter remaining companies
    cols = ('cnpj', )
    sex_places = load_newest_dataset('data/*-sex-place-distances.xz', cols)
    if sex_places is None or sex_places.empty:
        return companies

    return companies[~companies.cnpj.isin(sex_places.cnpj)]


def is_new_dataset(output):
    sex_places = find_newest_file('*sex-place-distances.xz', 'data')
    if not sex_places:
        return True

    # convert previous database from xz to csv
    pd.read_csv(sex_places, dtype=DTYPE).to_csv(output, index=False)
    os.remove(sex_places)
    return False


def convert_to_lzma(csv_output, xz_output):
    uncompressed = pd.read_csv(csv_output, dtype=DTYPE)
    uncompressed.to_csv(xz_output, compression='xz', index=False)
    os.remove(csv_output)


def shutdown():
    """
    Something is wrong and it does not worth it to keep running. This method
    cancels all pending coroutines so the script can wrap up data collection
    and move on (that is to say, compress the output we've got so far as an
    .xz file).
    """
    for task in asyncio.Task.all_tasks():
        task.cancel()


def main(companies_path, max_requests=500, sample_size=None, filters=None):
    if not filters:
        filters = dict()

    if not os.path.exists(companies_path):
        logging.info('File not found: {}'.format(companies_path))
        return

    # set file paths
    directory = os.path.dirname(os.path.abspath(companies_path))
    name = '{}-sex-place-distances.{}'
    today = date.today().strftime('%Y-%m-%d')
    csv_output = os.path.join(directory, name.format(today, 'csv'))
    xz_output = os.path.join(directory, name.format(today, 'xz'))

    # get companies
    companies = get_companies(companies_path, **filters)
    if sample_size:
        companies = companies.sample(sample_size)

    # check we have data to work on
    if companies.empty:
        logging.info('Nothing to fetch.')
        return None

    # run
    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(main_coro(companies, csv_output, max_requests))
    except CancelledError:
        logging.debug('All async tasks were stopped')
    finally:
        convert_to_lzma(csv_output, xz_output)


if __name__ == '__main__':
    description = 'Fetch the closest sex place to each company.'
    parser = ArgumentParser(description=description)
    parser.add_argument('companies_path', help='Companies .xz datset')
    parser.add_argument(
        '--max-parallel-requests', '-r', type=int, default=500,
        help='Max parallel requests (default: 500)'
    )
    parser.add_argument(
        '--sample-size', '-s', type=int, default=None,
        help='Limit fetching to a given sample size (default: None)'
    )
    parser.add_argument(
        '--city', '-c', required=True,
        help='Limit fetching to a given city (required)'
    )
    parser.add_argument(
        '--min-value', '-m', type=float, required=True,
        help='Limit fetching to expensed higher to a minimum amount (required)'
    )
    parser.add_argument(
        '--term', '-t', type=int, required=True,
        help='Limit fetching to a given term (required)'
    )
    args = parser.parse_args()

    # max parallel requests must be at least 13 (requests needed per company)
    if args.max_parallel_requests < 13:
        args.max_parallel_requests = 13

    main(
        args.companies_path,
        args.max_parallel_requests,
        args.sample_size,
        dict(term=args.term, city=args.city, value=args.min_value)
    )