sitch/sitchlib/utility.py
"""General utilities."""
# from __future__ import print_function
import datetime
import dateutil.parser as du_parser
import json
import os
import pprint
import psutil
import subprocess
import requests
from location_tool import LocationTool
class Utility:
"""General utility class."""
@classmethod
def dt_delta_in_minutes(cls, dt_1, dt_2):
"""Calculate the delta between two datetime objects, in minutes."""
delta_seconds = abs((dt_1.replace(tzinfo=None) -
dt_2.replace(tzinfo=None)).total_seconds())
retval = delta_seconds / 60
return retval
@classmethod
def generate_base_event(cls):
base_event = {"site_name": os.getenv('LOCATION_NAME', 'SITCH_SITE'),
"sensor_name": os.getenv('RESIN_DEVICE_NAME_AT_INIT',
'NOT_RESIN-MANAGED'),
"sensor_id": os.getenv('HOSTNAME', 'NO_HOSTNAME'),
"event_timestamp": cls.get_now_string(),
"event_type": "base_event"}
return base_event.copy()
@classmethod
def dt_from_iso(cls, iso_time):
"""Exchange an ISO8601-formatted string for a datetime object."""
return du_parser.parse(iso_time)
@classmethod
def epoch_to_iso8601(cls, unix_time):
"""Transform epoch time to ISO8601 format."""
cleaned = float(unix_time)
return datetime.datetime.utcfromtimestamp(cleaned).isoformat()
@classmethod
def get_now_string(cls):
"""Get ISO8601 timestamp for now."""
now = datetime.datetime.now().isoformat()
return now
@classmethod
def get_platform_info(cls):
"""Get information on platform and hardware."""
lshw = "/usr/bin/lshw -json"
try:
raw_response = subprocess.check_output(lshw.split())
platform_info = json.loads(raw_response.replace('\n', ''))
except:
print("Utility: Unable to get platform info from lshw!")
platform_info = {}
return platform_info
@classmethod
def start_component(cls, runcmd):
"""Start a thing."""
try:
subprocess.Popen(runcmd.split())
except KeyError as e:
print(e)
return False
return True
@classmethod
def create_path_if_nonexistent(cls, path):
"""Create filesystem directory path."""
if os.path.exists(path) and os.path.isdir(path):
return
elif os.path.exists(os.path.dirname(path)):
return
os.makedirs(os.path.dirname(path))
print(("Utility: Creating directory: %s") % path)
return
@classmethod
def create_file_if_nonexistent(cls, path, lfile):
"""Create file and path, if it doesn't already exist."""
fullpath = os.path.join(path, lfile)
if os.path.isfile(fullpath):
return
else:
logmsg = "Utility: Creating log file: %s" % fullpath
print(logmsg)
open(fullpath, 'a').close()
return
@classmethod
def write_file(cls, location, contents):
"""Write string to file."""
with open(location, 'w') as fh:
fh.write(contents)
@classmethod
def get_platform_name(cls):
"""Get platform name from lshw output."""
platform_info = Utility.get_platform_info
try:
platform_name = platform_info["product"]
except:
print("Utility: Failed to obtain platform name!")
platform_name = "Unspecified"
return platform_name
@classmethod
def strip_list(cls, raw_struct):
"""Strip contents from single-item list."""
if (type(raw_struct) is list and len(raw_struct)) == 1:
return raw_struct[0]
else:
return raw_struct
@classmethod
def get_public_ip(cls):
"""Get public IP."""
url = 'https://api.ipify.org/?format=json'
try:
result = (requests.get(url).json())['ip']
except requests.exceptions.ConnectionError:
print("Utility: Unable to get public IP from ipify.org. "
"Set to 127.0.0.1")
result = "127.0.0.1"
return result
@classmethod
def calculate_distance(cls, lon_1, lat_1, lon_2, lat_2):
"""Wrap the LocationTool.get_distance_between_points() fn."""
if None in [lon_1, lat_1, lon_2, lat_2]:
print("Utility: Geo coordinate is zero, not resolving distance.")
return 0
pos_1 = (lat_1, lon_1)
pos_2 = (lat_2, lon_2)
dist_in_km = LocationTool.get_distance_between_points(pos_1, pos_2)
dist_in_m = dist_in_km * 1000
return dist_in_m
@classmethod
def str_to_float(cls, s):
"""Change string to float."""
retval = None
try:
retval = float(s)
except:
errmsg = "Utility: Unable to convert %s to float" % str(s)
print(errmsg)
return retval
@classmethod
def heartbeat(cls, service_name):
"""Generate heartbeat message."""
scan = {"scan_program": "heartbeat",
"heartbeat_service_name": service_name,
"timestamp": Utility.get_now_string()}
return scan
@classmethod
def is_valid_json(cls, in_str):
"""Test string for json validity."""
try:
json.loads(in_str)
return True
except:
return False
@classmethod
def pretty_string(cls, structure):
"""Pretty-print lines."""
result = ""
pp = pprint.PrettyPrinter()
formatted = pp.pformat(structure)
for line in formatted.splitlines():
nextline = " %s\n" % line
result = result + nextline
return result
@classmethod
def hex_to_dec(cls, hx):
"""Change hex to decimal."""
try:
integer = int(str(hx), 16)
except Exception as e:
print("Unable to convert %s to an integer" % str(hx))
print(e)
integer = 0
return str(integer)
@classmethod
def construct_feed_file_name(cls, feed_dir, prefix):
"""Construct full path for feed file."""
file_name = "%s.csv.gz" % prefix
dest_file_name = os.path.join(feed_dir, file_name)
return dest_file_name
@classmethod
def get_performance_metrics(cls, application_uptime_s, queue_sizes={}):
"""Get sensor hardware and os performance statistics."""
retval = {}
retval["queue_sizes"] = queue_sizes
cpu_times = psutil.cpu_times()
retval["scan_program"] = "health_check"
retval["timestamp"] = Utility.get_now_string()
retval["cpu_percent"] = psutil.cpu_percent(percpu=True)
retval["cpu_times"] = {"user": cpu_times.user,
"system": cpu_times.system,
"idle": cpu_times.idle,
"iowait": cpu_times.iowait}
retval["mem"] = {"free": psutil.virtual_memory().free,
"swap_percent_used": psutil.swap_memory().percent}
retval["root_vol"] = psutil.disk_usage('/').percent
retval["data_vol"] = psutil.disk_usage('/data/').percent
retval["application_uptime_seconds"] = application_uptime_s
return retval
@classmethod
def validate_geojson(cls, geojson):
"""Ensure that geojson contains the right fields"""
valid = True
if len(geojson["coordinates"]) != 2:
valid = False
elif "type" not in geojson:
valid = False
elif geojson["type"] != "Point":
valid = False
return valid
@classmethod
def create_gmaps_link(cls, lat, lon):
return ("https://www.google.com/maps/search/?api=1&query=%s,%s" % (lat, lon)) # NOQA
# @classmethod
# def hdmi_print(cls, message):
# with open('/dev/tty1', 'w') as ttyfile:
# print(message, file=ttyfile)
# return