nexgddp/services/query_service.py
"""QUERY SERVICE"""
import logging
import os
import tempfile
import dateutil.parser
import dateutil.parser
from RWAPIMicroservicePython import request_to_microservice
from requests import Request, Session
from requests.exceptions import ConnectTimeout
from nexgddp.errors import SqlFormatError, PeriodNotValid, TableNameNotValid, GeostoreNeeded, RasdamanError
from nexgddp.helpers.coloring_helper import ColoringHelper
from nexgddp.services.xml_service import XMLService
RASDAMAN_URL = os.getenv('RASDAMAN_URL')
class QueryService(object):
@staticmethod
def get_stats(scenario, model, years, indicator, bbox, function):
logging.info('[QueryService] Getting aggregated temporal series from rasdaman')
year_min = sorted(years)[0]
year_max = sorted(years)[-1]
logging.info(year_min)
logging.info(year_max)
results = []
if bbox == []:
raise GeostoreNeeded("No latitude and longitude provided")
else:
bbox_str = f",Lat({bbox[0]}),Long({bbox[1]})"
# IMPORTANT - Rasdaman tablename must be in this format ->
query = f"for cov in ({scenario}_{model}_processed) return encode( {function}((cov.{indicator})[ ansi(\"{year_min}\":\"{year_max}\") {bbox_str}]), \"CSV\")"
logging.info('Running the query ' + query)
raster_filename = QueryService.get_rasdaman_query(query)
try:
datafile = open(raster_filename, "r")
raw_data = datafile.read()
processed_data = raw_data.replace('{', '').replace('}', '').split(',')
finally:
source_raster = None
# Removing the raster
os.remove(os.path.join('/tmp', raster_filename))
return map(float, processed_data)
return results
@staticmethod
def get_temporal_series(scenario, model, years, indicator, bbox):
logging.info('[QueryService] Getting temporal series from rasdaman')
year_min = sorted(years)[0].replace("'", '') if type(sorted(years)[0]) == 'string' else sorted(years)[0]
year_max = sorted(years)[-1].replace("'", '') if type(sorted(years)[-1]) == 'string' else sorted(years)[-1]
logging.info(year_min)
logging.info(year_max)
results = []
if bbox == []:
raise GeostoreNeeded("No latitude and longitude provided")
else:
bbox_str = f",Lat({bbox[0]}),Long({bbox[1]})"
ansidate_str = f"ansi(\"{year_min}\":\"{year_max}\")" if year_min != year_max else f"ansi(\"{year_min}\")"
query = f"for cov in ({scenario}_{model}_processed) return encode( (cov.{indicator})[ {ansidate_str} {bbox_str}], \"CSV\")"
logging.info('Running the query ' + query)
raster_filename = QueryService.get_rasdaman_query(query)
try:
datafile = open(raster_filename, "r")
raw_data = datafile.read()
processed_data = raw_data.replace('{', '').replace('}', '').split(',')
finally:
source_raster = None
# Removing the raster
os.remove(os.path.join('/tmp', raster_filename))
return map(float, processed_data)
@staticmethod
def get_all_data(scenario, model, years, bbox):
logging.info('[QueryService] Getting * data from rasdaman')
year_min = sorted(years)[0]
year_max = sorted(years)[-1]
results = []
if bbox == []:
raise GeostoreNeeded("No latitude and longitude provided")
else:
bbox_str = f",Lat({bbox[0]}),Long({bbox[1]})"
query = f"for cov in ({scenario}_{model}_processed) return encode( (cov)[ ansi(\"{year_min}\":\"{year_max}\") {bbox_str}], \"CSV\")"
logging.info('Running the query ' + query)
raster_filename = QueryService.get_rasdaman_query(query)
try:
output = {}
datafile = open(raster_filename, "r")
raw_data = datafile.read()
processed_data = raw_data.replace('{', '').replace('}', '').split(',')
fields_xml = QueryService.get_rasdaman_fields(scenario, model)
fields = XMLService.get_fields(fields_xml)
fields_without_year = dict((i, fields[i]) for i in fields if i != 'year')
varnames = list()
for key in fields_without_year:
varnames.append(key)
logging.debug(f"varnames: {varnames}")
results = [{} for _ in range(len(years))]
ansi_years = list(map(lambda x: dateutil.parser.parse(f"{x}-01-01").isoformat(), years))
logging.debug(ansi_years)
logging.debug(f"years: {years}")
logging.debug(f"processed_data: {processed_data}")
data_array = list()
for element in processed_data:
data_array.append(list(map(float, element.replace('"', '').split(' '))))
logging.debug(data_array)
processed_obj = list(map(lambda x: dict(zip(varnames, x)), data_array))
# logging.debug(processed_obj)
for i in range(len(years)):
results[i]['year'] = dateutil.parser.parse(f"{years[i]}-01-01").isoformat()
results[i].update(processed_obj[i])
finally:
source_raster = None
# Removing the raster. Really important!
os.remove(os.path.join('/tmp', raster_filename))
return results
@staticmethod
def get_rasdaman_query(query):
logging.info('[QueryService] Executing rasdaman query')
payload_arr = ['<?xml version="1.0" encoding="UTF-8" ?>',
'<ProcessCoveragesRequest xmlns="http://www.opengis.net/wcps/1.0" service="WCPS" version="1.0.0">',
'<query><abstractSyntax>',
query,
'</abstractSyntax></query>',
'</ProcessCoveragesRequest>']
payload = ''.join(payload_arr)
headers = {'Content-Type': 'application/xml'}
request = Request(
method='POST',
url=RASDAMAN_URL,
data=payload,
headers=headers
)
session = Session()
prepped = session.prepare_request(request)
response = session.send(prepped)
if response.status_code == 404:
raise PeriodNotValid('Period Not Valid')
with tempfile.NamedTemporaryFile(suffix='.tiff', delete=False) as f:
for chunk in response.iter_content(chunk_size=1024):
f.write(chunk)
raster_filename = f.name
logging.debug(f"[QueryService] Temporary raster filename: {raster_filename}")
f.close()
return raster_filename
@staticmethod
def get_rasdaman_csv_query(query):
logging.info('[QueryService] Executing rasdaman query')
payload_arr = ['<?xml version="1.0" encoding="UTF-8" ?>',
'<ProcessCoveragesRequest xmlns="http://www.opengis.net/wcps/1.0" service="WCPS" version="1.0.0">',
'<query><abstractSyntax>',
query,
'</abstractSyntax></query>',
'</ProcessCoveragesRequest>']
payload = ''.join(payload_arr)
headers = {'Content-Type': 'application/xml'}
request = Request(
method='POST',
url=RASDAMAN_URL,
data=payload,
headers=headers
)
session = Session()
prepped = session.prepare_request(request)
response = session.send(prepped)
return response.text
@staticmethod
def get_tile_query(bbox, year, model, scenario, indicator, bounds):
logging.info('[QueryService] Forming rasdaman query')
coverage = f'{scenario}_{model}_processed'
logging.debug(f'coverage: {coverage}')
query_list = [
'for cov in (',
coverage,
') return encode(scale((((cov.',
indicator,
')[ansi("',
f'{year}',
'"),Lat(',
str(bbox['lat'][0]),
':',
str(bbox['lat'][1]),
'),Long(',
str(bbox['lon'][0]),
':',
str(bbox['lon'][1]),
')]',
ColoringHelper.normalize(
*bounds
),
', {Lat: "CRS:1"(0:255), Long: "CRS:1"(0:255)}), "PNG")]'
]
envelope_list = ['<?xml version="1.0" encoding="UTF-8" ?>',
'<ProcessCoveragesRequest xmlns="http://www.opengis.net/wcps/1.0" service="WCPS" version="1.0.0">',
'<query><abstractSyntax>',
*query_list,
'</abstractSyntax></query>',
'</ProcessCoveragesRequest>'
]
payload = ''.join(envelope_list)
logging.debug(f"payload: {payload}")
headers = {'Content-Type': 'application/xml'}
request = Request(
method='POST',
url=RASDAMAN_URL,
data=payload,
headers=headers
)
session = Session()
prepped = session.prepare_request(request)
response = session.send(prepped)
if response.status_code == 404:
raise PeriodNotValid('Data not found')
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f:
for chunk in response.iter_content(chunk_size=1024):
f.write(chunk)
raster_filename = f.name
logging.debug(f"[QueryService] Temporary raster filename: {raster_filename}")
f.close()
return raster_filename
@staticmethod
def get_tile_mask_query(bbox, year, model, scenario, indicator, no_data):
logging.info('[QueryService] Forming rasdaman query')
coverage = f'{scenario}_{model}_processed'
logging.debug(f'coverage: {coverage}')
bounds_expr = f"[ansi(\"{str(year)}\"), Lat({bbox['lat'][0]}:{bbox['lat'][1]}), Long({bbox['lon'][0]}:{bbox['lon'][1]})]"
logging.debug(f'bounds_expr: {bounds_expr}')
query_str = f'for cov in ({coverage}) return encode(scale(((cov.{indicator}){bounds_expr} = {str(no_data)} ) * 255, {{Lat: "CRS:1"(0:255), Long: "CRS:1"(0:255)}}),"PNG")'
envelope_list = ['<?xml version="1.0" encoding="UTF-8" ?>',
'<ProcessCoveragesRequest xmlns="http://www.opengis.net/wcps/1.0" service="WCPS" version="1.0.0">',
'<query><abstractSyntax>',
query_str,
'</abstractSyntax></query>',
'</ProcessCoveragesRequest>'
]
payload = ''.join(envelope_list)
logging.debug(f"payload: {payload}")
headers = {'Content-Type': 'application/xml'}
request = Request(
method='POST',
url=RASDAMAN_URL,
data=payload,
headers=headers
)
session = Session()
prepped = session.prepare_request(request)
response = session.send(prepped)
if response.status_code == 404:
raise PeriodNotValid('Data not found')
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f:
for chunk in response.iter_content(chunk_size=1024):
f.write(chunk)
raster_filename = f.name
logging.debug(f"[QueryService] Temporary raster filename: {raster_filename}")
f.close()
return raster_filename
@staticmethod
def get_tile_diff_query(bbox, year, model, scenario, indicator, bounds, year_b, dset_b):
logging.info('[QueryService] Forming rasdaman query')
coverage_a = f'{scenario}_{model}_processed'
logging.debug(f'coverage_a: {coverage_a}')
logging.debug(f'coverage_b: {dset_b}')
logging.debug(f'bounds: {bounds}')
lower_bound_expr = ' - ' + str(bounds[0]) if float(bounds[0]) >= 0 else str(' + ' + str(abs(bounds[0])))
logging.debug(lower_bound_expr)
bounds_range = str(float(bounds[1]) - float(bounds[0]))
logging.debug(bounds_range)
# x := (c - a) / (b - a)
bbox_expr = f" Lat({bbox['lat'][0]}:{bbox['lat'][1]}),Long({bbox['lon'][0]}:{bbox['lon'][1]})]"
query = ''.join([
f"for cov1 in ({dset_b}), cov2 in ({coverage_a}) return encode(scale(",
f"(((cov1.{indicator})[ansi(\"{year_b}\"),",
bbox_expr,
f" - (cov2.{indicator})[ansi(\"{year}\"),",
bbox_expr,
f") {lower_bound_expr} ) * (255 / ({bounds_range} )),",
"{Lat: \"CRS:1\"(0:255), Long: \"CRS:1\"(0:255)}),\"PNG\")]" # Not a f-expression
])
logging.debug(query)
envelope_list = ['<?xml version="1.0" encoding="UTF-8" ?>',
'<ProcessCoveragesRequest xmlns="http://www.opengis.net/wcps/1.0" service="WCPS" version="1.0.0">',
'<query><abstractSyntax>',
query,
'</abstractSyntax></query>',
'</ProcessCoveragesRequest>'
]
payload = ''.join(envelope_list)
logging.debug(f"payload: {payload}")
headers = {'Content-Type': 'application/xml'}
request = Request(
method='POST',
url=RASDAMAN_URL,
data=payload,
headers=headers
)
session = Session()
prepped = session.prepare_request(request)
response = session.send(prepped)
if response.status_code == 404:
raise PeriodNotValid('Data not found')
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f:
for chunk in response.iter_content(chunk_size=1024):
f.write(chunk)
raster_filename = f.name
logging.debug(f"[QueryService] Temporary raster filename: {raster_filename}")
f.close()
return raster_filename
@staticmethod
def get_rasdaman_fields(scenario, model):
# Need to parse xml
logging.info(f"[QueryService] Getting fields for scenario {scenario} and model {model}")
headers = {'Content-Type': 'application/xml'}
params = {
'SERVICE': 'WCS',
'VERSION': '2.0.1',
'REQUEST': 'DescribeCoverage',
'COVERAGEID': f"{scenario}_{model}_processed"
}
request = Request(
method='GET',
url=RASDAMAN_URL,
headers=headers,
params=params
)
session = Session()
prepped = session.prepare_request(request)
try:
response = session.send(prepped, timeout=15)
except ConnectTimeout:
raise RasdamanError('Rasdaman not reachable')
if response.status_code == 404:
raise TableNameNotValid('Table Name Not Valid')
output = response.text
return output
@staticmethod
def convert(query, api_key):
logging.info('Converting Query: ' + query)
try:
response = request_to_microservice(
uri=f'/v1/convert/sql2SQL?sql={query}',
api_key=api_key,
method='GET'
)
except Exception as error:
raise error
if response.get('errors'):
errors = response.get('errors')
raise SqlFormatError(message=errors[0].get('detail'))
query = response.get('data', None)
s_query = query.get('attributes', {}).get('query')
json_sql = query.get('attributes', {}).get('jsonSql')
return s_query, json_sql
@staticmethod
def get_clone_url(dataset_id, query):
return {
'httpMethod': 'POST',
'url': '/v1/dataset/' + dataset_id + '/clone',
'body': {
'dataset': {
'datasetUrl': '/query/' + dataset_id + '?sql=' + query,
'application': [
'your',
'apps'
]
}
}
}
@staticmethod
def get_domain(scenario, model):
logging.info(f"Obtaining domain for the scenario {scenario}, and model {model}")
domain_xml = QueryService.get_rasdaman_fields(scenario, model)
domain = XMLService.get_domain(domain_xml)
domain_data = {
"lat": {
"max": domain.get('upperCorner')[0],
"min": domain.get('lowerCorner')[0]
},
"lon": {
"max": domain.get('upperCorner')[1],
"min": domain.get('lowerCorner')[1]
},
"year": {
"max": dateutil.parser.parse(domain.get('upperCorner')[2]).isoformat(),
"min": dateutil.parser.parse(domain.get('lowerCorner')[2]).isoformat()
}
}
return domain_data