thundergolfer/mAIcroft

View on GitHub
maicroft/social_info_extraction.py

Summary

Maintainability
F
1 wk
Test Coverage
import datetime
try:
    import urlparse
except (ImportError):
    import urllib.parse as urlparse
import calendar
import pytz
import re

from maicroft.util import Util
from maicroft.activity_metrics_proc import process_metrics
from maicroft.activity_metrics_proc import process_submission_metrics
from maicroft.subreddits import subreddits_dict, ignore_text_subs
from maicroft.text_parser import TextParser

"""
Generalising the information extraction processing of social media content.
Currently only Reddit social content is supported, but we want the code to be
reusable for other major social media content eg. Twitter, Facebook, LinkedIn...
"""

parser = TextParser()


def process_comment(user, comment):
    """
    Process a single comment.

    * Updates metrics
    * Sanitizes and extracts chunks from comment.
    """

    text = Util.sanitize_text(comment.text)  # Sanitize comment text.

    user.corpus += text.lower()  # Add comment text to corpus.

    comment_timestamp = datetime.datetime.fromtimestamp(
        comment.created_utc, tz=pytz.utc
    )

    process_metrics(user, comment)  # Process the comment for metrics

    # If comment is in a subreddit in which comments/user text
    # are to be ignored (such as /r/jokes, /r/writingprompts, etc), do not process it further.
    if comment.subreddit in ignore_text_subs:
        return False

    # TODO: This stopping if not I/My found is dodgy behaviour
    # If comment text does not contain "I" or "my", why even bother?
    if not re.search(r"\b(i|my)\b", text, re.I):
        return False

    # Now, this is a comment that needs to be processed.
    (chunks, sentiments) = parser.extract_chunks(text)
    user.sentiments += sentiments

    for chunk in chunks:
        user.load_attributes(chunk, comment)

    return True


def process_submission(user, submission):
    """
    Process a single submission.

    * Updates metrics
    * Sanitizes and extracts chunks from user text.
    """

    if(submission.is_user):
        text = Util.sanitize_text(submission.text)
        user.corpus += text.lower()

    process_submission_metrics(user, submission)  # add metrics info to user

    submission_type = None
    submission_domain = None
    submission_url_path = urlparse(submission.url).path

    if submission.domain.startswith("user."):
        submission_type = "Self"
        submission_domain = submission.subreddit
    elif (
        submission_url_path.endswith(tuple(user.IMAGE_EXTENSIONS)) or
        submission.domain.endswith(tuple(user.IMAGE_DOMAINS))
    ):
        submission_type = "Image"
        submission_domain = submission.domain
    elif submission.domain.endswith(tuple(user.VIDEO_DOMAINS)):
        submission_type = "Video"
        submission_domain = submission.domain
    else:
        submission_type = "Other"
        submission_domain = submission.domain
    t = [
        x for x in user.submissions_by_type["children"] if x["name"] == submission_type
    ][0]
    d = (
        [x for x in t["children"] if x["name"] == submission_domain] or [None]
    )[0]
    if d:
        d["size"] += 1
    else:
        t["children"].append({
            "name": submission_domain,
            "size": 1
        })

    # If submission is in a subreddit in which comments/user text
    # are to be ignored (such as /r/jokes, /r/writingprompts, etc),
    # do not process it further.
    if submission.subreddit in ignore_text_subs:
        return False

    # Only process user texts that contain "I" or "my"
    if not submission.is_user or not re.search(r"\b(i|my)\b", text, re.I):
        return False

    (chunks, sentiments) = parser.extract_chunks(text)
    user.sentiments += sentiments

    for chunk in chunks:
        user.load_attributes(chunk, submission)

    return True


def load_attributes(user, chunk, post_permalink):
    """
    Given an extracted chunk, load appropriate attributes from it.

    """

    # Is this chunk a possession/belonging?
    if chunk["kind"] == "possession" and chunk["noun_phrase"]:
        # Extract noun from chunk
        noun_phrase = chunk["noun_phrase"]
        noun_phrase_text = " ".join([w for w, t in noun_phrase])
        norm_nouns = " ".join([
            parser.normalize(w, t) for w, t in noun_phrase if t.startswith("N")
        ])

        noun = next(
            (w for w, t in noun_phrase if t.startswith("N")), None
        )
        if noun:
            # See if noun is a pet, family member or a relationship partner
            pet = parser.pet_animal(noun)
            family_member = parser.family_member(noun)
            relationship_partner = parser.relationship_partner(noun)

            if pet:
                user.pets.append((pet, post_permalink))
            elif family_member:
                user.family_members.append((family_member, post_permalink))
            elif relationship_partner:
                user.relationship_partners.append(
                    (relationship_partner, post_permalink)
                )
            else:
                user.possessions_extra.append((norm_nouns, post_permalink))

    # Is this chunk an action?
    elif chunk["kind"] == "action" and chunk["verb_phrase"]:
        verb_phrase = chunk["verb_phrase"]
        verb_phrase_text = " ".join([w for w, t in verb_phrase])

        # Extract verbs, adverbs, etc from chunk
        norm_adverbs = [
            parser.normalize(w, t)
            for w, t in verb_phrase if t.startswith("RB")
        ]
        adverbs = [w.lower() for w, t in verb_phrase if t.startswith("RB")]

        norm_verbs = [
            parser.normalize(w, t)
            for w, t in verb_phrase if t.startswith("V")
        ]
        verbs = [w.lower() for w, t in verb_phrase if t.startswith("V")]

        prepositions = [w for w, t in chunk["prepositions"]]

        noun_phrase = chunk["noun_phrase"]

        noun_phrase_text = " ".join(
            [w for w, t in noun_phrase if t not in ["DT"]]
        )
        norm_nouns = [
            parser.normalize(w, t)
            for w, t in noun_phrase if t.startswith("N")
        ]
        proper_nouns = [w for w, t in noun_phrase if t == "NNP"]
        determiners = [
            parser.normalize(w, t)
            for w, t in noun_phrase if t.startswith("DT")
        ]

        prep_noun_phrase = chunk["prep_noun_phrase"]
        prep_noun_phrase_text = " ".join([w for w, t in prep_noun_phrase])
        pnp_prepositions = [
            w.lower() for w, t in prep_noun_phrase if t in ["TO", "IN"]
        ]
        pnp_norm_nouns = [
            parser.normalize(w, t)
            for w, t in prep_noun_phrase if t.startswith("N")
        ]
        pnp_determiners = [
            parser.normalize(w, t)
            for w, t in prep_noun_phrase if t.startswith("DT")
        ]

        full_noun_phrase = (
            noun_phrase_text + " " + prep_noun_phrase_text
        ).strip()

        # TODO - Handle negative actions (such as I am not...),
        # but for now:
        if any(
            w in ["never", "no", "not", "nothing"]
            for w in norm_adverbs+determiners
        ):
            return

        # I am/was ...
        if (len(norm_verbs) == 1 and "be" in norm_verbs and not prepositions and noun_phrase):
            # Ignore gerund nouns for now
            if (
                "am" in verbs and
                any(n.endswith("ing") for n in norm_nouns)
            ):
                user.attributes_extra.append(
                    (full_noun_phrase, post_permalink)
                )
                return

            attribute = []
            for noun in norm_nouns:
                gender = None
                orientation = None
                if "am" in verbs:
                    gender = parser.gender(noun)
                    orientation = parser.orientation(noun)
                if gender:
                    user.genders.append((gender, post_permalink))
                elif orientation:
                    user.orientations.append(
                        (orientation, post_permalink)
                    )
                # Include only "am" phrases
                elif "am" in verbs:
                    attribute.append(noun)

            if attribute and (
                (
                    # Include only attributes that end
                    # in predefined list of endings...
                    any(
                        a.endswith(
                            parser.include_attribute_endings
                        ) for a in attribute
                    ) and not (
                        # And exclude...
                        # ...certain lone attributes
                        (
                            len(attribute) == 1 and
                            attribute[0] in parser.skip_lone_attributes and
                            not pnp_norm_nouns
                        )
                        or
                        # ...predefined skip attributes
                        any(a in attribute for a in parser.skip_attributes)
                        or
                        # ...attributes that end in predefined
                        # list of endings
                        any(
                            a.endswith(
                                parser.exclude_attribute_endings
                            ) for a in attribute
                        )
                    )
                ) or
                (
                    # And include special attributes with different endings
                    any(a in attribute for a in parser.include_attributes)
                )
            ):
                user.attributes.append(
                    (full_noun_phrase, post_permalink)
                )
            elif attribute:
                user.attributes_extra.append(
                    (full_noun_phrase, post_permalink)
                )

        # I live(d) in ...
        elif "live" in norm_verbs and prepositions and norm_nouns:
            if any(
                p in ["in", "near", "by"] for p in prepositions
            ) and proper_nouns:
                user.places_lived.append(
                    (
                        " ".join(prepositions) + " " + noun_phrase_text,
                        post_permalink
                    )
                )
            else:
                user.places_lived_extra.append(
                    (
                        " ".join(prepositions) + " " + noun_phrase_text,
                        post_permalink
                    )
                )

        # I grew up in ...
        elif "grow" in norm_verbs and "up" in prepositions and norm_nouns:
            if any(
                p in ["in", "near", "by"] for p in prepositions
            ) and proper_nouns:
                user.places_grew_up.append(
                    (
                        " ".join(
                            [p for p in prepositions if p != "up"]
                        ) + " " + noun_phrase_text,
                        post_permalink
                    )
                )
            else:
                user.places_grew_up_extra.append(
                    (
                        " ".join(
                            [p for p in prepositions if p != "up"]
                        ) + " " + noun_phrase_text,
                        post_permalink
                    )
                )

        elif(
            len(norm_verbs) == 1 and "prefer" in norm_verbs and
            norm_nouns and not determiners and not prepositions
        ):
            user.favorites.append((full_noun_phrase, post_permalink))

        elif norm_nouns:
            actions_extra = " ".join(norm_verbs)
            user.actions_extra.append((actions_extra, post_permalink))


def derive_attributes(user):
    """
    Derives attributes using activity data.
    """

    for name, count in user.commented_subreddits():
        subreddit = subreddits_dict[name] if name in subreddits_dict else None
        if (
            subreddit and subreddit["attribute"] and
            count >= user.MIN_THRESHOLD
        ):
            user.derived_attributes[subreddit["attribute"]].append(
                subreddit["value"].lower()
            )

    for name, count in user.submitted_subreddits():
        subreddit = subreddits_dict[name] if name in subreddits_dict else None
        if (
            subreddit and subreddit["attribute"] and
            count >= user.MIN_THRESHOLD
        ):
            user.derived_attributes[subreddit["attribute"]].append(
                subreddit["value"].lower()
            )

    # If someone mentions their wife,
    # they should be male, and vice-versa (?)
    # TODO: This is faulty logic and should be changed/removed - 25/07/16
    if "wife" in [v for v, s in user.relationship_partners]:
        user.derived_attributes["gender"].append("male")
    elif "husband" in [v for v, s in user.relationship_partners]:
        user.derived_attributes["gender"].append("female")

    commented_dates = sorted(user.commented_dates)
    submitted_dates = sorted(user.submitted_dates)
    active_dates = sorted(user.commented_dates + user.submitted_dates)

    min_date = datetime.datetime(datetime.MINYEAR, 1, 1, tzinfo=pytz.utc)
    first_comment_date = \
        min(commented_dates) if commented_dates else min_date
    first_submission_date = \
        min(submitted_dates) if submitted_dates else min_date

    user.first_post_date = max(first_comment_date, first_submission_date)

    active_dates += [datetime.datetime.now(tz=pytz.utc)]
    commented_dates += [datetime.datetime.now(tz=pytz.utc)]
    submitted_dates += [datetime.datetime.now(tz=pytz.utc)]

    # Find the longest period of inactivity
    comment_lurk_period = max(
        [
            {
                "from": calendar.timegm(d1.utctimetuple()),
                "to": calendar.timegm(d2.utctimetuple()),
                "days": (d2 - d1).seconds,
            } for d1, d2 in zip(
                commented_dates[:-1], commented_dates[1:]
            )
        ], key=lambda x: x["days"]
    ) if len(commented_dates) > 1 else {"days": -1}

    submission_lurk_period = max(
        [
            {
                "from": calendar.timegm(d1.utctimetuple()),
                "to": calendar.timegm(d2.utctimetuple()),
                "days": (d2 - d1).seconds,
            } for d1, d2 in zip(
                submitted_dates[:-1], submitted_dates[1:]
            )
        ], key=lambda x: x["days"]
    ) if len(submitted_dates) > 1 else {"days": -1}

    post_lurk_period = max(
        [
            {
                "from": calendar.timegm(d1.utctimetuple()),
                "to": calendar.timegm(d2.utctimetuple()),
                "days": (d2 - d1).seconds,
            } for d1, d2 in zip(
                # compares 1st with 2nd, 2nd with 3rd, 3rd with...
                active_dates[:-1], active_dates[1:]
            )
        ], key=lambda x: x["days"]
    )

    user.lurk_period = min(
        [
            x for x in [
                comment_lurk_period,
                submission_lurk_period,
                post_lurk_period
            ] if x["days"] >= 0
        ],
        key=lambda x: x["days"]
    )
    del user.lurk_period["days"]