XYOracleNetwork/sdk-archivist-nodejs

View on GitHub
src/repository/dynamodb/table/boundwitness.ts

Summary

Maintainability
C
7 hrs
Test Coverage
/* eslint-disable require-await */
/*
 * File: main-table.ts
 * Project: @xyo-network/sdk-archivist-nodejs
 * File Created: Tuesday, 23rd April 2019 8:14:51 am
 * Author: XYO Development Team (support@xyo.network)
 * -----
 * Last Modified: Friday, 13th November 2020 3:00:09 pm
 * Modified By: XYO Development Team (support@xyo.network>)
 * -----
 * Copyright 2017 - 2019 XY - The Persistent Company
 */

import { DynamoDB } from 'aws-sdk'
import lruCache from 'lru-cache'

import { Table } from './table'

export class BoundWitnessTable extends Table {
  private cache: lruCache<string, Buffer>

  constructor(tableName = 'xyo-archivist-boundwitness', region = 'us-east-1') {
    super(tableName, region)

    this.cache = new lruCache({
      max: 5000,
      maxAge: 1000 * 60 * 60, // one hour
    })

    this.createTableInput = {
      AttributeDefinitions: [
        {
          AttributeName: 'BlockHash',
          AttributeType: 'B',
        },
      ],
      KeySchema: [
        {
          AttributeName: 'BlockHash',
          KeyType: 'HASH',
        },
      ],
      ProvisionedThroughput: {
        ReadCapacityUnits: 5,
        WriteCapacityUnits: 5,
      },
      TableName: tableName,
    }
  }

  public async getItem(hash: Buffer): Promise<any> {
    return new Promise<boolean>((resolve: any, reject: any) => {
      try {
        const value = this.cache.get(hash.toString())
        if (value) {
          resolve(value)
          return
        }
        const params: DynamoDB.Types.GetItemInput = {
          Key: {
            BlockHash: {
              B: hash,
            },
          },
          ReturnConsumedCapacity: 'TOTAL',
          TableName: this.tableName,
        }
        this.dynamodb.getItem(
          params,
          (err: any, data: DynamoDB.Types.GetItemOutput) => {
            if (err) {
              reject(err)
            }
            if (data.Item) {
              const result = data.Item.Data.B as Buffer
              this.cache.set(hash.toString(), result)
              resolve(result)
              return
            }
            resolve()
          }
        )
      } catch (ex) {
        this.logError(ex)
        reject(ex)
      }
    })
  }

  public async putItem(hash: Buffer, originBlock: Buffer): Promise<void> {
    return new Promise<void>((resolve: any, reject: any) => {
      try {
        const params: DynamoDB.Types.PutItemInput = {
          Item: {
            BlockHash: {
              B: hash,
            },
            Data: {
              B: originBlock,
            },
          },
          ReturnConsumedCapacity: 'TOTAL',
          TableName: this.tableName,
        }
        this.dynamodb.putItem(
          params,
          (err: any, _data: DynamoDB.Types.PutItemOutput) => {
            if (err) {
              reject(err)
            }
            this.cache.set(hash.toString(), originBlock)
            resolve()
          }
        )
      } catch (ex) {
        this.logError(ex)
        reject(ex)
      }
    })
  }

  public async deleteItem(hash: Buffer): Promise<void> {
    return new Promise<void>((resolve: any, reject: any) => {
      try {
        const params: DynamoDB.Types.DeleteItemInput = {
          Key: {
            BlockHash: {
              B: hash,
            },
          },
          ReturnConsumedCapacity: 'TOTAL',
          TableName: this.tableName,
        }
        this.dynamodb.deleteItem(
          params,
          (err: any, _data: DynamoDB.Types.DeleteItemOutput) => {
            if (err) {
              this.logError(err)
              reject(err)
            }
            this.cache.del(hash.toString())
            resolve()
          }
        )
      } catch (ex) {
        this.logError(ex)
        reject(ex)
      }
    })
  }

  public async scan(
    limit: number,
    offsetHash?: Buffer | undefined
  ): Promise<any[]> {
    return new Promise<[]>((resolve: any, reject: any) => {
      try {
        const params: DynamoDB.Types.ScanInput = {
          Limit: limit,
          ReturnConsumedCapacity: 'TOTAL',
          TableName: this.tableName,
        }
        if (offsetHash) {
          params.ExclusiveStartKey = {
            BlockHash: {
              B: offsetHash,
            },
          }
        }

        this.dynamodb.scan(
          params,
          async (err: any, data: DynamoDB.Types.ScanOutput) => {
            if (err) {
              this.logError(err)
              reject(err)
            }

            let result = []

            if (data.Items) {
              for (const item of data.Items) {
                if (
                  item.BlockHash &&
                  item.BlockHash.B &&
                  item.Data &&
                  item.Data.B
                ) {
                  const payload = item.Data.B as Buffer
                  this.cache.set(item.BlockHash.B.toString(), payload)
                  result.push(payload)
                } else {
                  this.logError(
                    `Result with Missing BlockHash or Data: ${item}`
                  )
                }
              }
            }

            // if there is a LastEvaluatedKey, we need to get the next page because dynamodb limits a scan to 1 megabyte
            if (result.length < limit && data.LastEvaluatedKey) {
              const delta = limit - result.length
              const nextPage = await this.scan(
                delta,
                data.LastEvaluatedKey.BlockHash.B as Buffer
              )
              result = result.concat(nextPage)
            }

            resolve(result)
          }
        )
      } catch (ex) {
        this.logError(ex)
        reject(ex)
      }
    })
  }
}