IRC-SPHERE/HyperStream

View on GitHub
examples/tutorial_5_script.py

Summary

Maintainability
D
2 days
Test Coverage
import sys
sys.path.append("../") # Add parent dir in the Path

from hyperstream import HyperStream, Workflow
from hyperstream import TimeInterval
from hyperstream.utils import UTC
import hyperstream

from datetime import datetime
from utils import plot_high_chart
from utils import plot_multiple_stock
from dateutil.parser import parse

hs = HyperStream(loglevel=30)
print(hs)
print([p.channel_id_prefix for p in hs.config.plugins])

def dateparser(dt):
    return parse(dt.replace('M', '-')).replace(tzinfo=UTC)

ti_all = TimeInterval(datetime(1999, 1, 1).replace(tzinfo=UTC),
                      datetime(2013, 1, 1).replace(tzinfo=UTC))
ti_sample = TimeInterval(datetime(2007, 1, 1).replace(tzinfo=UTC),
                         datetime(2007, 3, 1).replace(tzinfo=UTC))

# M will be the Memory Channel
M = hs.channel_manager.memory

countries_dict = {
    'Asia': ['Bangkok', 'HongKong', 'KualaLumpur', 'NewDelhi', 'Tokyo'],
    'Australia': ['Brisbane', 'Canberra', 'GoldCoast', 'Melbourne',  'Sydney'],
    'NZ': ['Auckland', 'Christchurch', 'Dunedin', 'Hamilton','Wellington'],
    'USA': ['Chicago', 'Houston', 'LosAngeles', 'NY', 'Seattle']
}

# delete_plate requires the deletion to be first childs and then parents
for plate_id in ['C.C', 'C']:
    if plate_id in [plate[0] for plate in hs.plate_manager.plates.items()]:
        print('Deleting plate ' + plate_id)
        hs.plate_manager.delete_plate(plate_id=plate_id, delete_meta_data=True)

for country in countries_dict:
    id_country = 'country_' + country
    if not hs.plate_manager.meta_data_manager.contains(identifier=id_country):
        hs.plate_manager.meta_data_manager.insert(
            parent='root', data=country, tag='country', identifier=id_country)
    for city in countries_dict[country]:
        id_city = id_country + '.' + 'city_' + city
        if not hs.plate_manager.meta_data_manager.contains(identifier=id_city):
            hs.plate_manager.meta_data_manager.insert(
                parent=id_country, data=city, tag='city', identifier=id_city)

C = hs.plate_manager.create_plate(plate_id="C", description="Countries", values=[], complement=True,
                                  parent_plate=None, meta_data_id="country")
CC = hs.plate_manager.create_plate(plate_id="C.C", description="Cities", values=[], complement=True,
                                   parent_plate="C", meta_data_id="city")


print hs.plate_manager.meta_data_manager.global_plate_definitions

from hyperstream import Workflow

# parameters for the csv_mutli_reader tool
csv_temp_params = dict(
    filename_template='data/TimeSeriesDatasets_130207/Temp{}.csv',
    datetime_parser=dateparser, skip_rows=0, header=True)

csv_rain_params = dict(
    filename_template='data/TimeSeriesDatasets_130207/{}Rainfall.csv',
    datetime_parser=dateparser, skip_rows=0, header=True)

def mean(x):
    x = [value for value in x if value is not None]
    return float(sum(x)) / max(len(x), 1)

def dict_mean(d):
    x = d.values()
    x = [value for value in x if value is not None]
    return float(sum(x)) / max(len(x), 1)

def split_temperatures(d):
    new_d = {}
    for name, value in d.iteritems():
        key = name[-3:].lower()
        name = name[:-3]
        if name not in new_d:
            new_d[name] = {}
        new_d[name][key] = value
    return new_d

with Workflow(workflow_id='tutorial_05',
              name='tutorial_05',
              owner='tutorials',
              description='Tutorial 5 workflow',
              online=False) as w:

    country_node_raw_temp = w.create_node(stream_name='raw_temp_data', channel=M, plates=[C])
    country_node_temp = w.create_node(stream_name='temp_data', channel=M, plates=[C])
    city_node_temp = w.create_node(stream_name='city_temp', channel=M, plates=[CC])
    city_node_avg_temp = w.create_node(stream_name='city_avg_temp', channel=M, plates=[CC])
    country_node_avg_temp = w.create_node(stream_name='country_avg_temp', channel=M, plates=[C])

    country_node_raw_rain = w.create_node(stream_name='raw_rain_data', channel=M, plates=[C])
    city_node_rain = w.create_node(stream_name='city_rain', channel=M, plates=[CC])
    country_node_avg_rain = w.create_node(stream_name='country_avg_rain', channel=M, plates=[C])

    city_node_temp_rain = w.create_node(stream_name='city_temp_rain', channel=M, plates=[CC])
    country_node_avg_temp_rain = w.create_node(stream_name='country_avg_temp_rain', channel=M, plates=[C])

    world_node_avg_temp = w.create_node(stream_name='world_avg_temp',
                                        channel=M, plates=[])

    for c in C:
        country_node_raw_temp[c] = hs.plugins.data_importers.factors.csv_multi_reader(
                source=None, **csv_temp_params)
        country_node_temp[c] = hs.factors.apply(
                sources=[country_node_raw_temp[c]],
                func=split_temperatures)

        country_node_raw_rain[c] = hs.plugins.data_importers.factors.csv_multi_reader(
                source=None, **csv_rain_params)
        for cc in CC[c]:
            city_node_temp[cc] = hs.factors.splitter_from_stream(
                                    source=country_node_temp[c],
                                    splitting_node=country_node_temp[c],
                                    use_mapping_keys_only=True)
            city_node_avg_temp[cc] = hs.factors.apply(
                                    sources=[city_node_temp[c]],
                                    func=dict_mean)

            city_node_rain[cc] = hs.factors.splitter_from_stream(
                                    source=country_node_raw_rain[c],
                                    splitting_node=country_node_raw_rain[c],
                                    use_mapping_keys_only=True)

            city_node_temp_rain[cc] = hs.plugins.example.factors.aligned_correlation(
                                    sources=[city_node_avg_temp[cc],
                                             city_node_rain[cc]],
                                    use_mapping_keys_only=True)
        country_node_avg_temp[c] = hs.factors.aggregate(
                                    sources=[city_node_avg_temp],
                                    alignment_node=None,
                                    aggregation_meta_data='city', func=mean)
        country_node_avg_rain[c] = hs.factors.aggregate(
                                    sources=[city_node_rain],
                                    alignment_node=None,
                                    aggregation_meta_data='city', func=mean)
        country_node_avg_temp_rain[c] = hs.factors.aggregate(
                                    sources=[city_node_temp_rain],
                                    alignment_node=None,
                                    aggregation_meta_data='city', func=mean)
    world_node_avg_temp[None] = hs.factors.aggregate(sources=[country_node_avg_temp],
                                           alignment_node=None,
                                           aggregation_meta_data='country',
                                           func=mean)
    w.execute(ti_all)

print("\n#### Printing city node temperatures ####")
for stream_id, stream in M.find_streams(name='temp_data').iteritems():
    print stream_id
    print [instance.value for instance in stream.window(ti_sample).items()]

print("\n#### Printing city node temperatures ####")
for stream_id, stream in M.find_streams(name='city_temp').iteritems():
    print stream_id
    print [instance.value for instance in stream.window(ti_sample).items()]

print("\n#### Printing city node temp/rain ####")
for stream_id, stream in M.find_streams(name='city_temp_rain').iteritems():
    print stream_id
    print [instance.value for instance in stream.window(ti_sample).items()]

print("\n#### Printing country node avg temperatures ####")
for stream_id, stream in M.find_streams(name='country_avg_temp').iteritems():
    print stream_id
    print [instance.value for instance in stream.window(ti_sample).items()]

print("\n#### Printing country node avg rainfall ####")
for stream_id, stream in M.find_streams(name='country_avg_rain').iteritems():
    print stream_id
    print [instance.value for instance in stream.window(ti_sample).items()]

print("\n#### Printing world node avg temperatures   ####")
for stream_id, stream in M.find_streams(name='world_avg_temp').iteritems():
    print stream_id
    print [instance.value for instance in stream.window(ti_sample).items()]

print(w.to_json(w.factorgraph_viz, tool_long_names=False, indent=4))
from pprint import pprint
pprint(w.to_dict(tool_long_names=False))