app/dynamodb/csv_export.py
import configparser
from boto3.dynamodb.conditions import Key
from botocore.exceptions import ClientError
from tqdm import tqdm
import csv
import json
from decimal import Decimal
from typing import Any, Tuple, Dict
def csv_export(table: Any, file: str, parameters: Dict = {}) -> Tuple:
"""Export DynamoDB table to csv
Args:
table (Table): boto3 DynamoDB table object
file (str): csv file path
Returns:
Tuple: result message and exit code
"""
# read csv spec
try:
csv_spec = configparser.ConfigParser()
csv_spec.optionxform = str
csv_spec.read(f"{file}.spec")
except Exception as e:
return (f"CSV specification file can't read:{e}", 1)
# get delimiter options
if "DELIMITER_OPTION" in csv_spec:
delimiter = csv_spec.get("DELIMITER_OPTION", "DelimiterCharacter")
else:
delimiter = " " # default
# write csv
try:
with open(file, mode="w", encoding="utf_8") as f:
print("please wait {name} exporting {file}".format(
name=table.name, file=file))
export_items = []
try:
if "QUERY_OPTION" in csv_spec:
try:
# Partition key option
if "PKAttribute" in csv_spec["QUERY_OPTION"]:
pk_key = csv_spec.get("QUERY_OPTION", "PKAttribute")
pk_value = csv_spec.get("QUERY_OPTION", "PKAttributeValue")
pk_type = csv_spec.get("QUERY_OPTION", "PKAttributeType")
if pk_type == "I":
pk_value = int(pk_value)
parameters["KeyConditionExpression"] = Key(pk_key).eq(pk_value)
# Sort key option
if "SKAttribute" in csv_spec["QUERY_OPTION"]:
sk_key = csv_spec.get("QUERY_OPTION", "SKAttribute")
sk_values = csv_spec.get("QUERY_OPTION", "SKAttributeValues").split(",")
sk_type = csv_spec.get("QUERY_OPTION", "SKAttributeType")
if sk_type == "I":
sk_values = [int(v) for v in sk_values]
sk_values = sk_values[0] if len(sk_values) == 1 else sk_values
sk_expression = csv_spec.get("QUERY_OPTION", "SKAttributeExpression")
if sk_expression == "begins_with":
parameters["KeyConditionExpression"] &= Key(sk_key).begins_with(sk_values)
elif sk_expression == "between":
parameters["KeyConditionExpression"] &= Key(sk_key).between(sk_values[0], sk_values[1])
elif sk_expression == "eq":
parameters["KeyConditionExpression"] &= Key(sk_key).eq(sk_values)
elif sk_expression == "gt":
parameters["KeyConditionExpression"] &= Key(sk_key).gt(sk_values)
elif sk_expression == "gte":
parameters["KeyConditionExpression"] &= Key(sk_key).gte(sk_values)
elif sk_expression == "lt":
parameters["KeyConditionExpression"] &= Key(sk_key).lt(sk_values)
elif sk_expression == "lte":
parameters["KeyConditionExpression"] &= Key(sk_key).lte(sk_values)
# query table
while True:
response = table.query(**parameters)
export_items.extend(response["Items"])
if ("LastEvaluatedKey" in response):
parameters["ExclusiveStartKey"] = response["LastEvaluatedKey"]
else:
break
except Exception as e:
return (f"query option error:{e}", 1)
else:
# scan table
while True:
response = table.scan(**parameters)
export_items.extend(response["Items"])
if ("LastEvaluatedKey" in response):
parameters["ExclusiveStartKey"] = response["LastEvaluatedKey"]
else:
break
except ClientError as e:
return (f"aws client error:{e}", 1)
except Exception as e:
return (f"table not found:{e}", 1)
is_write_csv_header_labels = False
for item in tqdm(export_items):
if not is_write_csv_header_labels:
# write csv header labels
csv_header_labels = list(csv_spec["CSV_SPEC"])
writer = csv.DictWriter(f, fieldnames=csv_header_labels, lineterminator="\n")
writer.writeheader()
is_write_csv_header_labels = True
# updated dict to match specifications
for key in list(item.keys()):
try:
spec = csv_spec.get("CSV_SPEC", key)
except Exception:
# Removed attributes that do not match the specifications
del item[key]
continue
item[key] = convert_item(spec, item, key, delimiter)
writer.writerow(item)
return ("{name} csv exported {count} items".format(
name=table.name, count=len(export_items)), 0)
except IOError as e:
print(f"I/O error:{e}")
except Exception as e:
return (str(e), 1)
def convert_item(spec: str, item: Dict, key: str, delimiter: str) -> Any:
"""convert item
Args:
spec (str): type of item
row (Dict): row data
key (str): key
delimiter (str): list join delimiter
Returns:
Any: converted item value
"""
if spec == "S": # String
return str(item[key])
elif spec == "I": # Integer
return int(item[key])
elif spec == "D": # Decimal
return float(item[key])
elif spec == "B": # Boolean
if not item[key]:
return ""
elif spec == "J": # Json
return json.dumps(item[key], default=decimal_encode)
elif spec == "SL" or spec == "SS": # StringList or StringSet
return delimiter.join(item[key])
elif spec == "DL" or spec == "DS": # DecimalList or DecimalSetDecimalList
return delimiter.join(list(map(str, item[key])))
else:
return item[key]
def decimal_encode(obj: Any) -> float:
"""encode decimal
Args:
obj (Any): object
Raises:
TypeError: Not decimal object
Returns:
float: decimal object
"""
if isinstance(obj, Decimal):
return float(obj)
raise TypeError