myems-aggregation/energy_storage_power_station_energy_discharge.py

Summary

Maintainability
F
3 wks
Test Coverage
import random
import time
from datetime import datetime, timedelta
from decimal import Decimal
from multiprocessing import Pool
import mysql.connector
import config


########################################################################################################################
# PROCEDURES
# Step 1: get all energy storage power stations
# Step 2: Create multiprocessing pool to call worker in parallel
########################################################################################################################


def main(logger):

    while True:
        # the outermost while loop
        ################################################################################################################
        # Step 1: get all energy storage power stations
        ################################################################################################################
        cnx_system_db = None
        cursor_system_db = None
        try:
            cnx_system_db = mysql.connector.connect(**config.myems_system_db)
            cursor_system_db = cnx_system_db.cursor()
        except Exception as e:
            logger.error("Error in step 1.1 of energy_storage_power_station_energy_discharge.main " + str(e))
            if cursor_system_db:
                cursor_system_db.close()
            if cnx_system_db:
                cnx_system_db.close()
            # sleep and continue the outer loop to reconnect the database
            time.sleep(60)
            continue
        print("Connected to MyEMS System Database")

        energy_storage_power_station_list = list()
        try:
            cursor_system_db.execute(" SELECT id, name "
                                     " FROM tbl_energy_storage_power_stations "
                                     " ORDER BY id ")
            rows_energy_storage_power_stations = cursor_system_db.fetchall()

            if rows_energy_storage_power_stations is None or len(rows_energy_storage_power_stations) == 0:
                print("There isn't any energy storage power stations ")
                # sleep and continue the outer loop to reconnect the database
                time.sleep(60)
                continue

            for row in rows_energy_storage_power_stations:
                energy_storage_power_station_list.append({"id": row[0], "name": row[1]})

        except Exception as e:
            logger.error("Error in step 1.2 of energy_storage_power_station_energy_discharge.main " + str(e))
            # sleep and continue the outer loop to reconnect the database
            time.sleep(60)
            continue
        finally:
            if cursor_system_db:
                cursor_system_db.close()
            if cnx_system_db:
                cnx_system_db.close()

        print("Got all energy storage power stations in MyEMS System Database")

        # shuffle the energy storage power station list for randomly calculating the meter hourly value
        random.shuffle(energy_storage_power_station_list)

        ################################################################################################################
        # Step 2: Create multiprocessing pool to call worker in parallel
        ################################################################################################################
        p = Pool(processes=config.pool_size)
        error_list = p.map(worker, energy_storage_power_station_list)
        p.close()
        p.join()

        for error in error_list:
            if error is not None and len(error) > 0:
                logger.error(error)

        print("go to sleep 300 seconds...")
        time.sleep(300)
        print("wake from sleep, and continue to work...")
    # end of outer while


########################################################################################################################
# PROCEDURES:
#   Step 1: get all energy storage containers associated with the energy storage power station
#   Step 2: determine start datetime and end datetime to aggregate
#   Step 3: for each energy storage container in list, get energy discharge data from energy database
#   Step 4: determine common time slot to aggregate
#   Step 5: aggregate energy data in the common time slot by energy categories and hourly
#   Step 6: save energy data to energy database
#
# NOTE: returns None or the error string because that the logger object cannot be passed in as parameter
########################################################################################################################

def worker(energy_storage_power_station):
    cnx_system_db = None
    cursor_system_db = None
    try:
        cnx_system_db = mysql.connector.connect(**config.myems_system_db)
        cursor_system_db = cnx_system_db.cursor()
    except Exception as e:
        error_string = "Error in step 1.1 of energy_storage_power_station_energy_discharge.worker " + str(e)
        if cursor_system_db:
            cursor_system_db.close()
        if cnx_system_db:
            cnx_system_db.close()
        print(error_string)
        return error_string

    ####################################################################################################################
    # Step 1: get all energy storage containers associated with the energy storage power station
    ####################################################################################################################
    print("Step 1: get all energy storage containers associated with the energy storage power station")

    energy_storage_container_list = list()

    try:
        cursor_system_db.execute(" SELECT e.id, e.name "
                                 " FROM tbl_energy_storage_containers e, "
                                 "      tbl_energy_storage_power_stations_containers ec "
                                 " WHERE e.id = ec.energy_storage_container_id "
                                 "       AND ec.energy_storage_power_station_id = %s ",
                                 (energy_storage_power_station['id'],))
        rows_energy_storage_containers = cursor_system_db.fetchall()

        if rows_energy_storage_containers is not None and len(rows_energy_storage_containers) > 0:
            for row in rows_energy_storage_containers:
                energy_storage_container_list.append({"id": row[0], "name": row[1]})

    except Exception as e:
        error_string = "Error in step 1 of energy_storage_power_station_energy_discharge.worker " + str(e)
        print(error_string)
        return error_string
    finally:
        if cursor_system_db:
            cursor_system_db.close()
        if cnx_system_db:
            cnx_system_db.close()

    ####################################################################################################################
    # stop to the next energy storage power station if this energy storage power station is empty
    ####################################################################################################################
    if energy_storage_container_list is None or len(energy_storage_container_list) == 0:
        print("This is an empty energy storage power station ")
        return None

    ####################################################################################################################
    # Step 2: determine start datetime and end datetime to aggregate
    ####################################################################################################################
    print("Step 2: determine start datetime and end datetime to aggregate")
    cnx_energy_db = None
    cursor_energy_db = None
    try:
        cnx_energy_db = mysql.connector.connect(**config.myems_energy_db)
        cursor_energy_db = cnx_energy_db.cursor()
    except Exception as e:
        error_string = "Error in step 2.1 of energy_storage_power_station_energy_discharge.worker " + str(e)
        if cursor_energy_db:
            cursor_energy_db.close()
        if cnx_energy_db:
            cnx_energy_db.close()
        print(error_string)
        return error_string

    try:
        query = (" SELECT MAX(start_datetime_utc) "
                 " FROM tbl_energy_storage_power_station_discharge_hourly "
                 " WHERE energy_storage_power_station_id = %s ")
        cursor_energy_db.execute(query, (energy_storage_power_station['id'],))
        row_datetime = cursor_energy_db.fetchone()
        start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')
        start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None)

        if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime):
            # replace second and microsecond with 0
            # note: do not replace minute in case of calculating in half hourly
            start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None)
            # start from the next time slot
            start_datetime_utc += timedelta(minutes=config.minutes_to_count)

        end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None)

        print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
              + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])

    except Exception as e:
        error_string = "Error in step 2.2 of energy_storage_power_station_energy_discharge.worker " + str(e)
        if cursor_energy_db:
            cursor_energy_db.close()
        if cnx_energy_db:
            cnx_energy_db.close()
        print(error_string)
        return error_string

    ####################################################################################################################
    # Step 3: for each energy storage container in list, get energy discharge data from energy database
    ####################################################################################################################
    energy_energy_storage_container_hourly = dict()
    if energy_storage_container_list is not None and len(energy_storage_container_list) > 0:
        try:
            for energy_storage_container in energy_storage_container_list:
                energy_storage_container_id = str(energy_storage_container['id'])
                query = (" SELECT start_datetime_utc, actual_value "
                         " FROM tbl_energy_storage_container_discharge_hourly "
                         " WHERE energy_storage_container_id = %s "
                         "       AND start_datetime_utc >= %s "
                         "       AND start_datetime_utc < %s "
                         " ORDER BY start_datetime_utc ")
                cursor_energy_db.execute(query, (energy_storage_container_id, start_datetime_utc, end_datetime_utc,))
                rows_energy_values = cursor_energy_db.fetchall()
                if rows_energy_values is None or len(rows_energy_values) == 0:
                    energy_energy_storage_container_hourly[energy_storage_container_id] = None
                else:
                    energy_energy_storage_container_hourly[energy_storage_container_id] = dict()
                    for row_value in rows_energy_values:
                        current_datetime_utc = row_value[0]
                        actual_value = row_value[1]
                        energy_energy_storage_container_hourly[energy_storage_container_id][current_datetime_utc] = \
                            actual_value
        except Exception as e:
            error_string = "Error in step 3 of energy_storage_power_station_energy_discharge.worker " + str(e)
            if cursor_energy_db:
                cursor_energy_db.close()
            if cnx_energy_db:
                cnx_energy_db.close()
            print(error_string)
            return error_string

    ####################################################################################################################
    # Step 4: determine common time slot to aggregate
    ####################################################################################################################

    common_start_datetime_utc = start_datetime_utc
    common_end_datetime_utc = end_datetime_utc

    print("Getting common time slot of energy values for all energy storage containers...")
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
        if energy_energy_storage_container_hourly is not None and len(energy_energy_storage_container_hourly) > 0:
            for energy_storage_container_id, energy_hourly in energy_energy_storage_container_hourly.items():
                if energy_hourly is None or len(energy_hourly) == 0:
                    common_start_datetime_utc = None
                    common_end_datetime_utc = None
                    break
                else:
                    if common_start_datetime_utc < min(energy_hourly.keys()):
                        common_start_datetime_utc = min(energy_hourly.keys())
                    if common_end_datetime_utc > max(energy_hourly.keys()):
                        common_end_datetime_utc = max(energy_hourly.keys())

    if energy_energy_storage_container_hourly is None or len(energy_energy_storage_container_hourly) == 0:
        # There isn't any energy data
        print("There isn't any energy data")
        # continue the for energy storage power station loop to the next energy storage power station
        print("continue the for energy storage power station loop to the next energy storage power station")
        if cursor_energy_db:
            cursor_energy_db.close()
        if cnx_energy_db:
            cnx_energy_db.close()
        return None

    print("common_start_datetime_utc: " + str(common_start_datetime_utc))
    print("common_end_datetime_utc: " + str(common_end_datetime_utc))

    ####################################################################################################################
    # Step 5: aggregate energy data in the common time slot by energy categories and hourly
    ####################################################################################################################

    print("Step 5: aggregate energy data in the common time slot by energy categories and hourly")
    aggregated_values = list()
    try:
        current_datetime_utc = common_start_datetime_utc
        while common_start_datetime_utc is not None \
                and common_end_datetime_utc is not None \
                and current_datetime_utc <= common_end_datetime_utc:
            aggregated_value = dict()
            aggregated_value['start_datetime_utc'] = current_datetime_utc
            aggregated_value['actual_value'] = Decimal(0.0)

            if energy_storage_container_list is not None and len(energy_storage_container_list) > 0:
                for energy_storage_container in energy_storage_container_list:
                    energy_storage_container_id = str(energy_storage_container['id'])
                    aggregated_value['actual_value'] += \
                        energy_energy_storage_container_hourly[energy_storage_container_id][current_datetime_utc]

            aggregated_values.append(aggregated_value)

            current_datetime_utc += timedelta(minutes=config.minutes_to_count)

    except Exception as e:
        error_string = "Error in step 5 of energy_storage_power_station_energy_discharge.worker " + str(e)
        if cursor_energy_db:
            cursor_energy_db.close()
        if cnx_energy_db:
            cnx_energy_db.close()
        print(error_string)
        return error_string

    ####################################################################################################################
    # Step 6: save energy data to energy database
    ####################################################################################################################
    print("Step 6: save energy data to energy database")

    while len(aggregated_values) > 0:
        insert_100 = aggregated_values[:100]
        aggregated_values = aggregated_values[100:]
        try:
            add_values = (" INSERT INTO tbl_energy_storage_power_station_discharge_hourly "
                          "             (energy_storage_power_station_id, "
                          "              start_datetime_utc, "
                          "              actual_value) "
                          " VALUES  ")

            for aggregated_value in insert_100:
                add_values += " (" + str(energy_storage_power_station['id']) + ","
                add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "',"
                add_values += str(aggregated_value['actual_value']) + "), "
            print("add_values:" + add_values)
            # trim ", " at the end of string and then execute
            cursor_energy_db.execute(add_values[:-2])
            cnx_energy_db.commit()

        except Exception as e:
            error_string = "Error in step 6.1 of energy_storage_power_station_energy_discharge.worker " + str(e)
            print(error_string)
            if cursor_energy_db:
                cursor_energy_db.close()
            if cnx_energy_db:
                cnx_energy_db.close()
            return error_string

    if cursor_energy_db:
        cursor_energy_db.close()
    if cnx_energy_db:
        cnx_energy_db.close()
    return None