makeomatic/ms-payments

View on GitHub
src/actions/sale/sync.js

Summary

Maintainability
A
35 mins
Test Coverage
const { ActionTransport } = require('@microfleet/core');
const Promise = require('bluebird');
const moment = require('moment');
const forEach = require('lodash/forEach');

// helpers
const key = require('../../redis-key');
const { PAYPAL_DATE_FORMAT, SALES_ID_INDEX, SALES_DATA_PREFIX } = require('../../constants');
const { parseSale, saveCommon, getOwner } = require('../../utils/transactions');
const { serialize } = require('../../utils/redis');
const { payment: { list: listTransactions } } = require('../../utils/paypal');

const TRANSACTIONS_LIMIT = 20;

/**
 * @api {amqp} <prefix>.sale.sync Sync sale
 * @apiVersion 1.0.0
 * @apiName saleSync
 * @apiGroup Sale
 *
 * @apiDescription Performs synchronizations of sales
 * **TODO**: Find response schema
 * @apiSchema {jsonschema=sale/sync.json} apiRequest

 */
function transactionSync({ params: message = {} }) {
  const { config, redis } = this;
  const { paypal: paypalConfig } = config;

  function updateCommon(sale, owner) {
    return Promise.bind(this, parseSale(sale, owner)).then(saveCommon);
  }

  const getLatest = async () => {
    if (message.next_id) {
      return null;
    }

    const query = {
      order: 'DESC',
      criteria: 'create_time',
      offset: 0,
      limit: 1,
    };

    const { items } = await this.dispatch('sale.list', { params: query });

    return items;
  };

  function sendRequest(items) {
    const query = {
      count: TRANSACTIONS_LIMIT,
    };

    if (message.next_id) {
      query.start_id = message.next_id;
    } else if (items.length > 0) {
      query.start_time = moment(items[0].start_time).format(PAYPAL_DATE_FORMAT);
    }

    return listTransactions(query, paypalConfig);
  }

  function saveToRedis(data) {
    if (data.count === 0) {
      return null;
    }

    const pipeline = redis.pipeline();
    const updates = [];

    forEach(data.payments, (sale) => {
      const saleKey = key(SALES_DATA_PREFIX, sale.id);
      const owner = getOwner(sale);
      const saveData = {
        sale,
        owner,
        create_time: new Date(sale.create_time).getTime(),
        update_time: new Date(sale.update_time).getTime(),
      };

      pipeline.hmset(saleKey, serialize(saveData));
      pipeline.sadd(SALES_ID_INDEX, sale.id);

      updates.push(updateCommon.call(this, sale, owner));
    });

    updates.push(pipeline.exec());

    return Promise.all(updates).then(() => {
      if (data.count < TRANSACTIONS_LIMIT) {
        return null;
      }

      // recursively sync until we are done
      return transactionSync.call(this, { params: { next_id: data.next_id } });
    });
  }

  return Promise.bind(this).then(getLatest).then(sendRequest).then(saveToRedis);
}

transactionSync.transports = [ActionTransport.amqp];

module.exports = transactionSync;