dataplug-io/hubspot-dataplug

View on GitHub
lib/collections/contacts.js

Summary

Maintainability
C
1 day
Test Coverage
const _ = require('lodash')
const { URL } = require('url')
const moment = require('moment')
const { Source, Mapper } = require('@dataplug/dataplug')
const { HttpGetReader, PagedHttpGetReader } = require('@dataplug/dataplug-http')
const { JsonStreamReader } = require('@dataplug/dataplug-json')
const responseHandlerFactory = require('../responseHandlerFactory')
const config = require('../config')

// https://developers.hubspot.com/docs/methods/contacts/contacts-overview
const schema = {
  type: 'object',
  definitions: {
    sourceType: {
      type: ['string', 'null']
    },
    property: {
      type: 'object',
      properties: {
        value: {
          type: 'string'
        },
        versions: {
          type: 'array',
          items: {
            $ref: '#/definitions/valueHistory'
          }
        }
      },
      additionalProperties: false
    },
    valueHistory: {
      type: 'object',
      properties: {
        value: {
          type: 'string'
        },
        'source-type': {
          $ref: '#/definitions/sourceType'
        },
        'source-id': {
          type: ['string', 'null']
        },
        'source-label': {
          type: ['string', 'null']
        },
        timestamp: {
          type: 'integer'
        },
        selected: {
          type: 'boolean'
        }
      },
      additionalProperties: false,
      required: ['timestamp']
    },
    formSubmission: {
      type: 'object',
      properties: {
        'conversion-id': {
          type: 'string'
        },
        timestamp: {
          type: 'integer'
        },
        'form-id': {
          type: 'string'
        },
        'form-type': {
          type: 'string'
        },
        'portal-id': {
          type: 'integer'
        },
        'page-url': {
          type: 'string'
        },
        'page-title': {
          type: 'string'
        },
        title: {
          type: 'string'
        },
        'meta-data': {
          type: 'array',
          items: {
            type: 'object'
          }
        }
      },
      additionalProperties: false,
      required: ['timestamp']
    },
    listMembership: {
      type: 'object',
      properties: {
        'static-list-id': {
          type: 'integer'
        },
        'internal-list-id': {
          type: 'integer'
        },
        timestamp: {
          type: 'integer'
        },
        vid: {
          type: 'integer'
        },
        'is-member': {
          type: 'boolean'
        }
      },
      additionalProperties: false
    },
    identityProfile: {
      type: 'object',
      properties: {
        vid: {
          type: 'integer'
        },
        'saved-at-timestamp': {
          type: 'integer'
        },
        'deleted-changed-timestamp': {
          type: 'integer'
        },
        identities: {
          type: 'array',
          items: {
            $ref: '#/definitions/identity'
          }
        }
      },
      additionalProperties: false,
      required: ['vid']
    },
    identity: {
      type: 'object',
      properties: {
        type: {
          type: 'string'
        },
        value: {
          type: 'string'
        },
        timestamp: {
          type: 'integer'
        },
        'is-primary': {
          type: 'boolean'
        }
      },
      additionalProperties: false
    },
    mergeAudit: {
      type: 'object',
      properties: {
        'canonical-vid': {
          type: 'integer'
        },
        'vid-to-merge': {
          type: 'integer'
        },
        timestamp: {
          type: 'integer'
        },
        'entity-id': {
          type: 'string'
        },
        'user-id': {
          type: 'integer'
        },
        'num-properties-moved': {
          type: 'integer'
        },
        merged_from_email: {
          $ref: '#/definitions/mergedFromEmail'
        },
        merged_to_email: {
          $ref: '#/definitions/mergedToEmail'
        }
      },
      additionalProperties: false,
      required: ['timestamp']
    },
    mergedFromEmail: {
      type: 'object',
      properties: {
        value: {
          type: 'string'
        },
        sourceType: {
          $ref: '#/definitions/sourceType'
        },
        'source-id': {
          type: ['string', 'null']
        },
        'source-label': {
          type: ['string', 'null']
        },
        'source-vids': {
          type: 'array',
          items: {
            type: 'integer'
          }
        },
        timestamp: {
          type: 'integer'
        },
        selected: {
          type: 'boolean'
        }
      },
      additionalProperties: false
    },
    mergedToEmail: {
      type: 'object',
      properties: {
        value: {
          type: 'string'
        },
        sourceType: {
          $ref: '#/definitions/sourceType'
        },
        'source-id': {
          type: ['string', 'null']
        },
        'source-label': {
          type: ['string', 'null']
        },
        timestamp: {
          type: 'integer'
        },
        selected: {
          type: 'boolean'
        }
      },
      additionalProperties: false
    },
    associatedCompanyProperty: {
      type: 'object',
      properties: {
        value: {
          type: 'string'
        }
      }
    },
    associatedCompany: {
      type: 'object',
      properties: {
        'portal-id': {
          type: 'integer'
        },
        'company-id': {
          type: 'integer'
        },
        properties: {
          type: 'object',
          patternProperties: {
            '^.*$': {
              $ref: '#/definitions/associatedCompanyProperty'
            }
          },
          additionalProperties: false
        }
      },
      additionalProperties: false,
      required: ['portal-id', 'company-id']
    }
  },
  properties: {
    vid: {
      type: 'integer'
    },
    'canonical-vid': {
      type: 'integer'
    },
    'merged-vids': {
      type: 'array',
      items: {
        type: 'integer'
      }
    },
    'portal-id': {
      type: 'integer'
    },
    'is-contact': {
      type: 'boolean'
    },
    'profile-token': {
      type: 'string'
    },
    'profile-url': {
      type: 'string'
    },
    properties: {
      type: 'object',
      patternProperties: {
        '^.*$': {
          $ref: '#/definitions/property'
        }
      },
      additionalProperties: false
    },
    'form-submissions': {
      type: 'array',
      items: {
        $ref: '#/definitions/formSubmission'
      }
    },
    'list-memberships': {
      type: 'array',
      items: {
        $ref: '#/definitions/listMembership'
      }
    },
    'identity-profiles': {
      type: 'array',
      items: {
        $ref: '#/definitions/identityProfile'
      }
    },
    'merge-audits': {
      type: 'array',
      items: {
        $ref: '#/definitions/mergeAudit'
      }
    },
    'associated-company': {
      $ref: '#/definitions/associatedCompany'
    }
  },
  additionalProperties: false,
  required: ['portal-id', 'canonical-vid']
}

const earliestSince = moment()
  .subtract(30, 'days')
  .add(1, 'seconds')
  .milliseconds(0)
const configDeclaration = config.declaration.extended((declaration) => declaration
  .parameters({
    recently: {
      description: 'Fetch only recent comapneis in descending order by date',
      enum: ['created', 'updated']
    },
    since: {
      description: 'Date and time to query data since',
      type: 'string',
      format: 'date-time',
      default: earliestSince.format()
    },
    propertyMode: {
      description: 'Fetch only property current value or with historical values',
      enum: ['value_only', 'value_and_history'],
      default: 'value_and_history'
    },
    formSubmissionMode: {
      description: 'Specifies which form submissions should be fetched',
      enum: ['all', 'none', 'newest', 'oldest'],
      default: 'all'
    },
    showListMemberships: {
      description: 'List current contact memberships or no',
      type: 'boolean',
      default: true
    },
    property: {
      describe: 'Only fetch property specified, by default all are fetched',
      type: 'array',
      item: 'string'
    }
  })
  // TODO: implies('since', 'recently')
)

const contactsQueryMapping = config.mapping.query.extended((mapping) => mapping
  .asIs('propertyMode')
  .asIs('formSubmissionMode')
  .asIs('showListMemberships')
  .default('count', () => 100))

const contactDetailsQueryMapping = config.mapping.query.extended((mapping) => mapping
  .asIs('propertyMode')
  .asIs('formSubmissionMode')
  .asIs('showListMemberships'))

const factory = (params) => {
  params = _.clone(params)

  let uri = '/contacts/v1/lists/all/contacts/all'
  if (params.recently === 'created') {
    uri = '/contacts/v1/lists/all/contacts/recent'
  } else if (params.recently === 'updated') {
    uri = '/contacts/v1/lists/recently_updated/contacts/recent'
  }

  let since
  if (params.recently) {
    since = moment(params.since)
    if (since.diff(earliestSince) < 0) {
      throw new Error(`Invalid range: since ${since} can not be before ${earliestSince}`)
    }
  } else {
    delete params.since
  }

  const url = new URL(uri, params.endpoint)
  const nextPage = (page, data) => {
    if (data && data['has-more']) {
      const vidOffset = data['vid-offset']
      page.query['vidOffset'] = vidOffset

      // Used in recentlyCreated & recentlyUpdated modes
      const timeOffset = data['time-offset']
      if (timeOffset) {
        if (since && since.valueOf() > timeOffset) {
          return false
        }
        page.query['timeOffset'] = timeOffset
      }

      return true
    }
    return false
  }
  const transformFactory = () => new JsonStreamReader('!.contacts.*')
  const query = contactsQueryMapping.apply(params)
  const headers = config.mapping.headers.apply(params)
  const responseHandler = responseHandlerFactory(params)

  const detailsMapper = (contact) => {
    if (!contact || (contact.addedAt && since && since.valueOf() > contact.addedAt)) {
      return
    }
    const url = new URL(`/contacts/v1/contact/vid/${contact.vid}/profile`, params.endpoint)
    const query = contactDetailsQueryMapping.apply(params)
    const headers = config.mapping.headers.apply(params)

    const transform = new JsonStreamReader('!')
    return new HttpGetReader(url, {
      transform,
      query,
      headers,
      responseHandler
    })
  }

  const contactsStream = new PagedHttpGetReader(url, nextPage, {
    transformFactory,
    query,
    headers,
    responseHandler
  })
  const mappedStream = new Mapper(detailsMapper)
  return [contactsStream, mappedStream]
}

const source = new Source(configDeclaration, factory)

module.exports = {
  origin: 'hubspot',
  name: 'contacts',
  schema,
  source
}