danishi/dynamodb-csv

View on GitHub
app/dynamodb/csv_import.py

Summary

Maintainability
C
1 day
Test Coverage
import configparser
from tqdm import tqdm
import csv
import json
from decimal import Decimal
from typing import Any, Dict, Tuple

count = 0
error_count = 0


def csv_import(table: Any, file: str, ignore: bool = False) -> Tuple:
    """csv import into DynamoDB table

    Args:
        table (Any): boto3 DynamoDB table object
        file (str): csv file path
        ignore (bool): ignore put item error

    Returns:
        Tuple: result message and exit code
    """

    # read csv spec
    try:
        csv_spec = configparser.ConfigParser()
        csv_spec.optionxform = str
        csv_spec.read(f"{file}.spec")
    except Exception as e:
        return (f"CSV specification file can't read:{e}", 1)

    # get delimiter options
    if "DELIMITER_OPTION" in csv_spec:
        delimiter = csv_spec.get("DELIMITER_OPTION", "DelimiterCharacter")
    else:
        delimiter = " "

    # read csv
    try:
        with open(file, mode="r", encoding="utf_8") as f:
            reader = csv.DictReader(f)

            # batch_size = 100
            batch = []

            print("please wait {name} importing {file}".format(
                name=table.name, file=file))
            for row in tqdm(reader):
                # No use buffer
                # if len(batch) >= batch_size:
                #     write_to_dynamo(table, batch)
                #     batch.clear()

                # updated dict to match specifications
                for key in list(row.keys()):
                    spec = csv_spec.get("CSV_SPEC", key)

                    # Convert blank value
                    if "IMPORT_OPTION" in csv_spec:
                        if "ConvertBlankToNullAttrs" in csv_spec["IMPORT_OPTION"] and not row[key]:
                            to_null_attrs = csv_spec.get("IMPORT_OPTION", "ConvertBlankToNullAttrs").split(",")
                            if key in to_null_attrs:
                                row[key] = None
                                continue

                        if "ConvertBlankToDropAttrs" in csv_spec["IMPORT_OPTION"] and not row[key]:
                            to_drop_attrs = csv_spec.get("IMPORT_OPTION", "ConvertBlankToDropAttrs").split(",")
                            if key in to_drop_attrs:
                                del row[key]
                                continue

                    try:
                        row[key] = convert_column(spec, row, key, delimiter)
                    except Exception:
                        del row[key]

                batch.append(row)

            if (len(batch)) > 0:
                write_to_dynamo(table, batch, ignore)

        if ignore:
            message = "{name} csv imported {count} items and {error_count} error items".format(
                name=table.name, count=count, error_count=error_count)
        else:
            message = "{name} csv imported {count} items".format(
                name=table.name, count=count)
        return (message, 0)

    except Exception as e:
        return (f"CSV file can't read:{e}", 1)


def convert_column(spec: str, row: Dict, key: str, delimiter: str) -> Any:
    """convert column

    Args:
        spec (str): type of column
        row (Dict): row data
        key (str): key
        delimiter (str): list split delimiter

    Returns:
        Any: converted column value
    """
    if spec == "S":  # String
        return str(row[key])
    elif spec == "I":  # Integer
        return int(row[key])
    elif spec == "D":  # Decimal
        return Decimal(row[key])
    elif spec == "B":  # Boolean
        return bool(row[key])
    elif spec == "J":  # Json
        return json.loads(row[key], parse_float=Decimal)
    elif spec == "SL":  # StringList
        return row[key].split(delimiter)
    elif spec == "SS":  # StringSet
        return set(row[key].split(delimiter))
    elif spec == "DL":  # DecimalList
        return list(map(Decimal, row[key].split(delimiter)))
    elif spec == "DS":  # DecimalSet
        return set(list(map(Decimal, row[key].split(delimiter))))
    else:
        return row[key]


def write_to_dynamo(table: Any, rows: Dict, ignore: bool = False) -> None:
    """csv rows into DynamoDB

    Args:
        table (Any): boto3 DynamoDB table object
        rows (Dict): csv rows
        ignore (bool): ignore put item error
    """
    global count
    global error_count

    # Ignore error item
    if ignore:
        for i in tqdm(range(len(rows))):
            try:
                table.put_item(
                    Item=rows[i]
                )
                count = count + 1
            except Exception:
                error_count = error_count + 1

    # Batch write
    else:
        try:
            # overwrite duplicate key item
            key_names = [x["AttributeName"] for x in table.key_schema]
            with table.batch_writer(overwrite_by_pkeys=key_names) as batch:
                for i in tqdm(range(len(rows))):
                    batch.put_item(
                        Item=rows[i]
                    )
                    count = count + 1

        except Exception as e:
            print(f"Error executing batch_writer:{e}")