oaeproject/Hilary

View on GitHub
packages/oae-activity/lib/internal/aggregator.js

Summary

Maintainability
B
4 hrs
Test Coverage
A
96%
/*!
 * Copyright 2014 Apereo Foundation (AF) Licensed under the
 * Educational Community License, Version 2.0 (the "License"); you may
 * not use this file except in compliance with the License. You may
 * obtain a copy of the License at
 *
 *     http://opensource.org/licenses/ECL-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an "AS IS"
 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import { format } from 'node:util';
import _ from 'underscore';
import { equals, pipe, sort, map, join, values } from 'ramda';

import { logger } from 'oae-logger';
import { telemetry } from 'oae-telemetry';

import { Activity, ActivityEntity } from 'oae-activity/lib/model.js';
import { ActivityConstants } from 'oae-activity/lib/constants.js';

import * as ActivityUtil from 'oae-activity/lib/util.js';
import * as ActivityRegistry from './registry.js';
import * as ActivitySystemConfig from './config.js';
import ActivityEmitter from './emitter.js';
import * as ActivityDAO from './dao.js';
import * as ActivityBuckets from './buckets.js';

const log = logger('oae-activity-aggregator');

const isZero = equals(0);
const Telemetry = telemetry('activity');

// Used in an aggregate key to denote that there was no entity provided for an activity. This differs from an empty string in that
// an empty string is used when the aggregate does not pivot on that entity.
const ENTITY_KEY_EMPTY = '__null__';

/**
 * Resets the aggregation process for a set of activity streams.
 *
 * @param  {String[]}   activityStreamIds   The set of activity streams that need to have their aggregation reset
 * @param  {Function}   callback            Standard callback function
 * @param  {Object}     callback.err        An error that occurred, if any
 */
const resetAggregationForActivityStreams = function (activityStreamIds, callback) {
  callback = callback || function () {};
  ActivityDAO.resetAggregationForActivityStreams(activityStreamIds, (error) => {
    if (error) {
      log().error({ err: error, activityStreamIds }, 'Failed to reset aggregation for activity streams');
      return callback(error);
    }

    ActivityEmitter.emit(ActivityConstants.events.RESET_AGGREGATION, activityStreamIds);
    return callback();
  });
};

/**
 * Perform a full collection of all activity buckets. If any bucket is already locked by another process, it will be skipped. When
 * this process completes and the callback is invoked, it will guarantee that:
 *
 * a) This process was not allowed to start another collection cycle, as there were too many occuring; or
 * b) for every bucket that wasn't locked, it was collected until it was empty.
 *
 * This function is most useful for unit tests to ensure that all activities up until a point in time have been aggregated and delivered.
 *
 * @param  {Function}   [callback]      Invoked when collection is complete
 * @param  {Object}     [callback.err]  An error that occurred, if any
 */
const collectAllBuckets = function (callback) {
  const numberOfBuckets = ActivitySystemConfig.getConfig().numberOfProcessingBuckets;
  const { maxConcurrentCollections } = ActivitySystemConfig.getConfig();
  const { collectionExpiry } = ActivitySystemConfig.getConfig();
  ActivityBuckets.collectAllBuckets(
    'activity',
    numberOfBuckets,
    maxConcurrentCollections,
    collectionExpiry,
    _collectBucket,
    callback
  );
};

/**
 * Collect and process a certain amount of routed activities from the given bucket. This method is *not safe* in the sense
 * that it does not try and first acquire a lock on the bucket. Do not use this directly, instead use `collectBucket` which
 * in turn uses this method with locks.
 *
 * Collecting and processing activities in this method goes through the following steps:
 *
 *  1. Get the next batch of queued activities in the bucket. The number of activities processed is determined by `limit`.
 *
 *  2.  Expand the activities retrieved into all the potential "aggregate keys" that it could match. When expanding, a
 *      preliminary process of aggregation occurs where we collect aggregates within just the set of activities we
 *      fetched from the queue. See `createAggregates` for more information.
 *
 *  3.  Get the status of all aggregate keys we expanded. This helps us identify which aggregates are "active" (i.e., have
 *      received matching activities and have not expired). It also helps us identify expired aggregates, in which case we
 *      can delete all their aggregated data (status and entities).
 *
 *  4.  For all expired aggregates, delete their status entry and all their previously aggregated entities. For more information
 *      on expiry, see `_isExpired`.
 *
 *  5.  For all *active* aggregates, fetch all the aggregated entities that are stored for them. This ensures that the activities
 *      current history can carry forward when this aggregate is redelivered.
 *
 *  6.  Merge all the aggregates with their aggregated entities fetched in step #5, and identify which aggregates will be
 *      delivered as activities. Aggregates that will be delivered are determined by:
 *
 *          *   Identify which activities belong to "multi-aggregates". A multi-aggregate is an aggregate that has aggregated atleast
 *              two distinct activities. Merge the data and mark it to be delivered (i.e., the activity becomes "claimed"). In this
 *              case, the `lastActivity` id of the aggregate status is recorded so it may be deleted (because it was replaced with the
 *              updated aggregate)
 *          *   For all activities that weren't claimed by a multi-aggregate, identify the ones that belong to a "single-aggregate". A
 *              single-aggregate is an aggregate that only contains a single activity of data. The single-aggregate then becomes merged
 *              with the new activity. Merging the new activity with a single-aggregate will now make it a multi-aggregate if the new
 *              activity contains at least one entity that did not belong to the single-aggregate. The previous activity is deleted
 *              in the same way as with multi-aggregates
 *          *   Identify which activities do not belong to any active aggregate (no single- or multi-aggregate exists for it) and mark
 *              them to be delivered, but only once per route. This new aggregate is now a "single-aggregate" which can now claim a
 *              matching activity at the single-aggregate priority if a matching activity occurs within the expiry time
 *
 *  7.  Save all the new aggregate entities that were found in the collected batch of queued activities. This ensures that when
 *      the next activity comes along and matches those aggregates, that historical information is there to carry forward.
 *
 *  8.  Iterate over all the aggregates that are marked for delivery, and generate the activity object based on their aggregated
 *      entities and metadata (publish date, verb, activityType, etc...)
 *
 *  9.  Persist the new activities to their routes
 *
 *  10. Delete all the activities that were being tracked as active aggregates. We recorded which activities need to be deleted
 *      during step #6
 *
 *  11. Update the status of all aggregates. At the very least to indicate they were just touched by an activity. For aggregates that
 *      actually resulted in a delivered activity, they will also be given a `lastActivity` id that will be used to delete the
 *      activity we just delivered if a new activity matches the aggregate
 *
 *  12. Delete the queued activities that we just processed so that they don't get reprocessed.
 *
 * @param  {Number}    bucketNumber        The bucket to process.
 * @param  {Function}  callback            Standard callback function
 * @param  {Object}    callback.err        An error that occurred, if any
 * @api private
 */
const _collectBucket = function (bucketNumber, callback) {
  const collectionStart = Date.now();
  const limit = ActivitySystemConfig.getConfig().collectionBatchSize;
  log().trace('Collecting batch of %s entries from bucket number %s.', limit, bucketNumber);

  // Step #1: Get the next batch of queued activities to process
  ActivityDAO.getQueuedActivities(bucketNumber, limit, (error, routedActivities, numberToDelete) => {
    if (error) return callback(error);

    // No more to process, so stop and report that we're empty
    if (isZero(numberToDelete)) return callback(null, true);

    /**
     * Step #2: Delete the queued activities that we're processing. We do
     * this right away because if there is any bad data, we want to make
     * sure that the existence of that data does not permanently inhibit
     * future collection rounds, so we need to ensure we pull that data out
     * regardless if aggregation is successful
     */
    ActivityDAO.deleteQueuedActivities(bucketNumber, numberToDelete, (error_) => {
      if (error_) return callback(error_);

      // Step #3: Explode the routed activities into their potential aggregates, according to their configured pivot points
      const allAggregates = createAggregates(_.values(routedActivities));
      const allAggregateKeys = _.keys(allAggregates);

      // Step #4: Get all aggregate statuses to determine which ones are expired and which are active. Expired aggregates
      // should be deleted, while active aggregates should be merged and redelivered
      ActivityDAO.getAggregateStatus(allAggregateKeys, (error, statusByAggregateKey) => {
        if (error) return callback(error);

        // Figure out which aggregates are "active" (have an activity in its aggregate) and "expired" (no new activities in the aggregate before expiry time)
        const activeAggregates = {};
        const expiredAggregates = {};
        _.each(allAggregateKeys, (aggregateKey) => {
          const status = statusByAggregateKey[aggregateKey];
          if (status && _isExpired(status, allAggregates[aggregateKey].published)) {
            expiredAggregates[aggregateKey] = true;
          } else if (status) {
            activeAggregates[aggregateKey] = true;
          }
        });

        // Note: We need to delete aggregated entities and save them here within the collection chain to avoid nuking undelivered
        // entities. If we saved aggregated entities during the routing phase and only deleted them here, it would save us a write
        // as we wouldn't have to write them to the queue, but it exposes a race condition where entities that are saved between
        // getAggregateStatus (above) and deleteAggregateData (below) will be deleted before delivery.

        // Step #5: Delete all the expired aggregates before aggregating new stuff
        ActivityDAO.deleteAggregateData(_.keys(expiredAggregates), (error_) => {
          if (error_) {
            return callback(error_);
          }

          // Step #6: Retrieve all entities that are aggregated within the active aggregates so they can be collected into redelivered activities
          ActivityDAO.getAggregatedEntities(_.keys(activeAggregates), (error, fetchedEntities) => {
            if (error) return callback(error);

            /*!
             * Step #7:
             *
             * Here we choose which aggregates need to be wrapped up into an activity and delivered to the activity stream. This is
             * rather difficult to get right. These are the rules implemented below:
             *
             *  For a given activity:
             *
             *  1.  If a matching multi-aggregate exists for an activity, it will "claim" the activity for that stream and redeliver
             *      the updated aggregate, while deleting the old version of the activity. If more than one multi-aggregate matches
             *      the activity, all matching multi-aggregates are redelivered. This is to support the situation where:
             *
             *          Aggregate #1: "Branden followed Simon and Bert"
             *          Aggregate #2: "Nicolaas and Stuart followed Stephen"
             *
             *      Now, when Branden follows Stephen, both of those "multi-aggregates" will claim this activity, as such:
             *
             *          Aggregate #1': "Branden followed Simon, Bert and Stephen"
             *          Aggregate #2': "Nicolaas, Stuart and Branden followed Stephen"
             *
             * 2.   For all activities that haven't been claimed, if a single-aggregate exists for an activity, it will "claim" the
             *      activity for that stream and redeliver the updated aggregate, while deleting the old version of the activity. If
             *      more than one single-aggregate matches the activity, all matching single-aggregates are redelivered. This is to
             *      support the situation where:
             *
             *          Aggregate #1: "Branden followed Simon"
             *          Aggregate #2: "Nicolaas followed Stephen"
             *
             *      Now, when Branden followed Stephen, both of those "single-aggregates" will claim this activity and become "multi-
             *      aggregates", as such:
             *
             *          Aggregate #1: "Branden followed Simon and Stephen"
             *          Aggregate #2: "Nicolaas and Branden followed Stephen"
             *
             * 3.   An activity is only delivered for an inactive aggregate if the activity was not "claimed" for the route by an
             *      active single- or multi-aggregate. This would make sure that we don't redeliver an active aggregate, AND deliver
             *      a new single-aggregate (e.g., "Branden shared Syllabus with OAE Team") for the same route.
             *
             * 4.   If no active aggregates claim an activity, and there are multiple inactive aggregates (e.g., the activity type has
             *      multiple "pivot points"), then one single activity is delivered for all of them. This is necessary to ensure that
             *      the "lastActivityId" is recorded properly for both aggregates, so if either of those inactive aggregates become
             *      active later (i.e., another activity comes along and matches it), the previous activity can be properly deleted by
             *      either of the aggregates.
             *
             *  FIXMEMAYBE: https://github.com/oaeproject/Hilary/pull/650#issuecomment-23865585
             *
             */

            // Keeps track of all aggregate keys that should actually be delivered. Not all potential aggregates get delivered
            // because other aggregates may take priority or they may be duplicates of existing activities
            const aggregatesToDeliver = {};

            // When a new activity is delivered that aggregates with an existing activity, the existing activity gets deleted
            // and replaced with a newer aggregate that represents them both. This hash keeps track of each activity that
            // should be deleted for replacement
            const activitiesToDelete = {};

            // Keeps track of which activities have already been "claimed" by an aggregate so that an activity doesn't get
            // delivered twice
            const claimedRouteActivities = {};

            // Keeps track of how many *new* activities have been delivered to a route. Multple activities that aggregate into
            // the same activity are considered "new" activities in a route
            const numberNewActivitiesByRoute = {};

            // First, give an opportunity for all active "multi-aggregates" to claim the routed activity. That is to say
            // aggregates who actually have aggregated two or more activities. This indicates they would have clobbered any
            // "single-aggregates" that are their predecessors
            _.each(allAggregateKeys, (aggregateKey) => {
              const aggregate = allAggregates[aggregateKey];
              const isMultiAggregateInQueue = aggregate.activityIds.length > 1;
              const isMultiAggregateInFeed =
                fetchedEntities[aggregateKey] && !_isSingleAggregate(fetchedEntities[aggregateKey]);

              // If the activity is already a multi-aggregate in the routed activity queue, or it is a multi-aggregate living in
              // the aggregates cache, we will claim them here with top priority
              if (isMultiAggregateInQueue || isMultiAggregateInFeed) {
                const status = statusByAggregateKey[aggregateKey];

                // Mark this to be delivered and assign it an activity id
                aggregatesToDeliver[aggregateKey] = true;
                aggregate[ActivityConstants.properties.OAE_ACTIVITY_ID] = ActivityDAO.createActivityId(
                  aggregate.published
                );

                // Mark these activities for this route as being claimed by an active aggregate
                claimedRouteActivities[aggregate.route] = claimedRouteActivities[aggregate.route] || {};
                _.each(aggregate.activityIds, (activityId) => {
                  claimedRouteActivities[aggregate.route][activityId] = true;
                });

                if (status && status.lastActivity) {
                  // If this was previously delivered, delete the previous activity
                  activitiesToDelete[aggregate.route] = activitiesToDelete[aggregate.route] || {};
                  activitiesToDelete[aggregate.route][status.lastActivity] = true;
                } else if (isMultiAggregateInQueue && !isMultiAggregateInFeed) {
                  // If this aggregate is aggregating with an activity in the queue (=in-memory aggregation)
                  // but NOT with activities already delivered to the feed, it means multiple activities
                  // were launched in quick successesion (content-create for example) that could be aggregated
                  // into one single activity. This increments the number of new activities for this route by 1
                  numberNewActivitiesByRoute[aggregate.route] = numberNewActivitiesByRoute[aggregate.route] || 0;
                  numberNewActivitiesByRoute[aggregate.route]++;
                }
              }
            });

            // Second, give an opportunity for all active "single-aggregates" to claim the routed activity. That is to say
            // aggregates who are actually just a single activity that has happened, and no other activities have "joined"
            // them yet
            _.each(allAggregateKeys, (aggregateKey) => {
              const aggregate = allAggregates[aggregateKey];

              // We know this activity only has 1 activity id now, because all aggregates with multiple activity ids would
              // have already been claimed as multi-aggregates
              const activityId = aggregate.activityIds[0];
              const isClaimed =
                claimedRouteActivities[aggregate.route] && claimedRouteActivities[aggregate.route][activityId];
              if (!isClaimed && activeAggregates[aggregateKey]) {
                const status = statusByAggregateKey[aggregateKey];

                // Mark this to be delivered and assign it an activity id
                aggregatesToDeliver[aggregateKey] = true;
                aggregate[ActivityConstants.properties.OAE_ACTIVITY_ID] = ActivityDAO.createActivityId(
                  aggregate.published
                );

                // Mark these activities for this route as being claimed by an active aggregate
                claimedRouteActivities[aggregate.route] = claimedRouteActivities[aggregate.route] || {};
                _.each(aggregate.activityIds, (activityId) => {
                  claimedRouteActivities[aggregate.route][activityId] = true;
                });

                if (status && status.lastActivity) {
                  // If this was previously delivered, delete the previous activity
                  activitiesToDelete[aggregate.route] = activitiesToDelete[aggregate.route] || {};
                  activitiesToDelete[aggregate.route][status.lastActivity] = true;
                }
              }
            });

            // Lastly, for aggregates that are not even active (i.e., they are brand new aggregates, no match on
            // any recent activities), determine if they can be delivered
            const incrementedForActivities = {};
            _.each(allAggregateKeys, (aggregateKey) => {
              const aggregate = allAggregates[aggregateKey];

              // We know this activity only has 1 activity id now, because all aggregates with multiple activity ids would
              // have already been claimed as multi-aggregates
              const activityId = aggregate.activityIds[0];

              const isClaimed =
                claimedRouteActivities[aggregate.route] && claimedRouteActivities[aggregate.route][activityId];
              if (!isClaimed) {
                // If this route has not received an aggregate, then we deliver the non-active one(s). In the event that
                // there are multiple non-active aggregates, a duplicate activity will not be fired because we flatten and
                // maintain a set while generating activities later.
                aggregatesToDeliver[aggregateKey] = true;
                aggregate[ActivityConstants.properties.OAE_ACTIVITY_ID] = activityId;

                // When delivering single non-active aggregates, it's possible that we might deliver 2 aggregates to the
                // same route. To ensure that we do not increment the count more than once for an activity, we flatten
                // the aggregate into a unique string that identifies the activity it represents. This way we can keep
                // track of whether an activity already incremented the notification count
                const flattenedActivity = _flattenActivity(aggregate);
                if (!incrementedForActivities[flattenedActivity]) {
                  numberNewActivitiesByRoute[aggregate.route] = numberNewActivitiesByRoute[aggregate.route] || 0;
                  numberNewActivitiesByRoute[aggregate.route]++;
                  incrementedForActivities[flattenedActivity] = true;
                }
              }
            });

            // Step #8: Save the aggregated entities stored in the current batch of aggregates
            ActivityDAO.saveAggregatedEntities(allAggregates, (error__) => {
              if (error__) {
                return callback(error__);
              }

              // Step #9: Create the actual activities to route
              let numberDelivered = 0;
              const visitedActivities = {};
              const activityStreamUpdates = {};
              _.each(aggregatesToDeliver, (aggregateToDeliver, aggregateKey) => {
                const aggregate = allAggregates[aggregateKey];

                // Construct the activities to deliver
                const activityType = aggregate[ActivityConstants.properties.OAE_ACTIVITY_TYPE];
                const { published, verb } = aggregate;

                // Refresh the entities with the freshly fetched set, which has all the entities, not those just in this collection
                // We need to make sure we override with the queued entities and not the freshly fetched ones since they may have been
                // updated since original aggregation.
                if (fetchedEntities[aggregateKey]) {
                  aggregate.addActors(fetchedEntities[aggregateKey].actors);
                  aggregate.addObjects(fetchedEntities[aggregateKey].objects);
                  aggregate.addTargets(fetchedEntities[aggregateKey].targets);
                }

                // Make sure that we don't deliver an identical activity to the same stream twice. This can potentially
                // happen when an activity type has multiple pivots that were inactive prior to this activity (e.g., content-share)
                let activityId = null;
                const flattenedActivity = _flattenActivity(aggregate);
                if (visitedActivities[flattenedActivity]) {
                  // We assign the previous activity id to the aggregate so that we can update the aggregate status to know that
                  // any new activities for this aggregate should replace its existing activity
                  aggregate[ActivityConstants.properties.OAE_ACTIVITY_ID] = visitedActivities[flattenedActivity];
                  return;
                }

                // This activity is not a duplicate, assign and record a new activityId
                activityId = ActivityDAO.createActivityId(aggregate.published);
                aggregate[ActivityConstants.properties.OAE_ACTIVITY_ID] = activityId;
                visitedActivities[flattenedActivity] = activityId;

                // Create the entities for the delivered activity
                const actor = createActivityEntity(_.values(aggregate.actors));
                const object = createActivityEntity(_.values(aggregate.objects));
                const target = createActivityEntity(_.values(aggregate.targets));

                activityStreamUpdates[aggregate.route] = activityStreamUpdates[aggregate.route] || {};
                activityStreamUpdates[aggregate.route][activityId] = new Activity(
                  activityType,
                  activityId,
                  verb,
                  published,
                  actor,
                  object,
                  target
                );
                numberDelivered++;
              });

              // Step #10: Deliver the new activities to the streams
              ActivityDAO.deliverActivities(activityStreamUpdates, (error___) => {
                if (error___) {
                  return callback(error___);
                }

                // Collection date is marked as the date/time that the aggregate gets delivered
                const collectionDate = Date.now();

                // Record how long it took for these to be delivered
                _.each(activityStreamUpdates, (routedActivities) => {
                  _.each(routedActivities, (activity) => {
                    Telemetry.appendDuration('delivery.time', activity.published);
                  });
                });

                // The activitiesToDelete hash values should actually be arrays of unique activity ids, not "<activity id>: true" pairs.
                _.each(activitiesToDelete, (activityToDelete, route) => {
                  activitiesToDelete[route] = _.keys(activitiesToDelete[route]);
                });

                // Step #11: Delete the old activities that were replaced by aggregates
                ActivityDAO.deleteActivities(activitiesToDelete, (error___) => {
                  if (error___) {
                    return callback(error___);
                  }

                  // Determine how to update all the aggregate statuses
                  const statusUpdatesByActivityStreamId = {};
                  _.each(allAggregateKeys, (aggregateKey) => {
                    const aggregate = allAggregates[aggregateKey];
                    statusUpdatesByActivityStreamId[aggregate.route] =
                      statusUpdatesByActivityStreamId[aggregate.route] || {};
                    statusUpdatesByActivityStreamId[aggregate.route][aggregateKey] = {
                      lastUpdated: aggregate.published,
                      lastCollected: collectionDate
                    };

                    if (!activeAggregates[aggregateKey]) {
                      // This aggregate was not previously active, so mark its creation date at the beginning of the first activity
                      statusUpdatesByActivityStreamId[aggregate.route][aggregateKey].created = aggregate.published;
                    }

                    // Mark the last activity for each aggregate. This ensures that when a new activity gets added to the aggregate, we can
                    // delete the previous one.
                    if (aggregate[ActivityConstants.properties.OAE_ACTIVITY_ID]) {
                      statusUpdatesByActivityStreamId[aggregate.route][aggregateKey].lastActivity =
                        aggregate[ActivityConstants.properties.OAE_ACTIVITY_ID];
                    }
                  });

                  // Step #12: Update the activity statuses, indicating they have just been updated and collected, where applicable
                  ActivityDAO.indexAggregateData(statusUpdatesByActivityStreamId, (error___) => {
                    if (error___) {
                      return callback(error___);
                    }

                    // Fire an event that we have successfully delivered these individual activities
                    const deliveredActivityInfos = {};
                    _.each(routedActivities, (routedActivity) => {
                      const activityStream = ActivityUtil.parseActivityStreamId(routedActivity.route);
                      const { streamType, resourceId } = activityStream;
                      deliveredActivityInfos[resourceId] = deliveredActivityInfos[resourceId] || {};
                      deliveredActivityInfos[resourceId][streamType] = deliveredActivityInfos[resourceId][
                        streamType
                      ] || {
                        numNewActivities: numberNewActivitiesByRoute[routedActivity.route] || 0,
                        activities: []
                      };

                      deliveredActivityInfos[resourceId][streamType].activities.push(routedActivity.activity);
                    });

                    if (!_.isEmpty(deliveredActivityInfos)) {
                      ActivityEmitter.emit(ActivityConstants.events.DELIVERED_ACTIVITIES, deliveredActivityInfos);
                    }

                    Telemetry.appendDuration('collection.time', collectionStart);
                    Telemetry.incr('collected.count', _.size(routedActivities));
                    Telemetry.incr('delivered.count', numberDelivered);

                    return callback();
                  });
                });
              });
            });
          });
        });
      });
    });
  });
};

/**
 * Explode the given routed activities into all potential aggregates. An aggregate is a permutation of a routed activity that
 * further keys each by the pivot points by which the activity can be aggregated over a period of time.
 *
 * The routed activities are an array of following form:
 *
 * [
 *  {
 *      'route': <route>,
 *      'activity': <Activity>
 *  },
 *  { ... }
 * ]
 *
 * Where the route specifies the route to which the activity should be delivered, and the activity is the activity to deliver.
 *
 * The result will be an object representing the aggregation of all the routed activities in the list, keyed by the aggregate key. An
 * example aggregation of an activity that pivots on actor and had 3 matching aggregates in the array would be:
 *
 *  {
 *       '<aggregateKey>': {
 *           'route': '...',
 *           'oae:activityType': '...',
 *           'activityIds': [ '<activityId0>', ... ],
 *           'verb': '...',
 *           'published': '...',
 *           'actors': {
 *               '<actorKey0>': <ActivityEntity (actor)>
 *           },
 *           'objects': {
 *               '<objectKey0>': <ActivityEntity (object)>,
 *               '<objectKey1>': <ActivityEntity (object)>,
 *               '<objectKey2>': <ActivityEntity (object)>
 *           },
 *           'targets': {}
 *       }
 *  }
 *
 *  @see #ActivityAggregate model object for more details
 *
 * @param  {Object[]}  routedActivities    An array of activities along with the route to which they should be delivered. See summary for more information
 * @return {Object}                        An object representing the potential aggregates of the collected batch of activities
 */
const createAggregates = function (routedActivities) {
  const aggregates = {};
  _.each(routedActivities, (routedActivity) => {
    // A routedActivity could be null if the contents failed to parse (corrupt?). Just skip over it.
    if (!routedActivity) {
      return;
    }

    const { activity, route } = routedActivity;
    const activityType = activity[ActivityConstants.properties.OAE_ACTIVITY_TYPE];
    const activityId = activity[ActivityConstants.properties.OAE_ACTIVITY_ID];

    // Build the entity keys which will be used to create the aggregate key
    const actorKey = _createEntityKey(activity.actor);
    const objectKey = _createEntityKey(activity.object);
    const targetKey = _createEntityKey(activity.target);

    // Determine how this activity will be grouped (a.k.a., pivot points) for aggregation
    const activityTypes = ActivityRegistry.getRegisteredActivityTypes();
    let groupBy = activityTypes[activityType] ? activityTypes[activityType].groupBy : [];

    // Ensure we atleast have the "all" aggregate, which means we don't get duplicate activities within the same aggregation
    // period
    if (_.isEmpty(groupBy)) {
      groupBy = [
        {
          actor: true,
          object: true,
          target: true
        }
      ];
    }

    // For each potential grouping, create an "aggregate key", which will be used to determine if new activity deliveries
    // match with the same key
    _.each(groupBy, (pivot) => {
      const pivotActorKey = _createPivotKey(activity.actor, pivot.actor);
      const pivotObjectKey = _createPivotKey(activity.object, pivot.object);
      const pivotTargetKey = _createPivotKey(activity.target, pivot.target);

      // The aggregate key is of the following format: "content-create#u:oae:mrvisser#user:u:oae:mrvisser##
      const aggregateKey = format('%s#%s#%s#%s#%s', activityType, route, pivotActorKey, pivotObjectKey, pivotTargetKey);

      // This process of collecting actors, objects, targets and activities is in some respect "in-memory aggregation". It
      // helps to use this to determine ahead of time if there are a few activities within this batch of routed activities
      // that already match. It helps us deliver an aggregate right away to the route, rather than accidentally delivering
      // individual activities from within a batch
      if (!aggregates[aggregateKey]) {
        aggregates[aggregateKey] = new ActivityAggregate(activityType, route, activity.verb, activity.published);
      }

      const aggregate = aggregates[aggregateKey];

      // Below, we suppress the aggregate only if it does not contribute any new entity to an existing aggregate. This
      // allows us to avoid aggregating exact duplicates from within the batch and making it look like it is a live
      // aggregate

      const hasNew = _contributesNewEntity(aggregate, actorKey, objectKey, targetKey);

      if (activity.actor) {
        aggregate.updateActor(actorKey, activity.actor);
      }

      if (activity.object) {
        aggregate.updateObject(objectKey, activity.object);
      }

      if (activity.target) {
        aggregate.updateTarget(targetKey, activity.target);
      }

      // Ensure we record the most recent occurance of an activity
      aggregate.published = activity.published;

      // Only make this aggregate look like an active aggregate if a second one actually contributed a new entity
      if (hasNew) {
        aggregate.activityIds.push(activityId);
      }
    });
  });

  return aggregates;
};

/**
 * Create the pivot key for the given entity according to the specified pivot spec (e.g., groupBy)
 *
 * @param  {ActivityEntity}             entity          The entity for which to create the pivot key
 * @param  {Boolean|String|Function}    [pivotSpec]     The pivot spec in the activity registry of this activity which describes how to pivot
 * @return {String}                                     The pivot key that tells the aggregator how to aggregate on activities that contain this entity
 * @api private
 */
const _createPivotKey = function (entity, pivotSpec) {
  let key = '';
  if (pivotSpec) {
    if (pivotSpec === true) {
      // When the pivot spec for an entity is true, we default to the identity of the entity
      // which is what people want most of the time
      key = _createEntityKey(entity);
    } else if (_.isString(pivotSpec)) {
      // If we get a string, we treat it as the key of the entity to aggregate on. This is
      // useful to collect entities in different ways rather than pivoting on a unique entity
      // (e.g., we can collect on resource type, or on visibility, etc...)
      key = entity[pivotSpec];
    } else if (_.isFunction(pivotSpec)) {
      // When given a function for the pivot spec, we give full control for the caller to
      // determine the pivot key based on the entity
      key = pivotSpec(entity);
    }
  }

  if (!_.isString(key)) {
    log().warn(
      {
        entity,
        key,
        pivotSpec
      },
      'Entity resulted in non-string pivot key'
    );
    return '';
  }

  return key;
};

/**
 * Determines whether or not the given actor, object and target keys represent contribution of any new entities to this aggregate.
 *
 * @param  {Object}     aggregate           The aggregate to check
 * @param  {Object}     aggregate.actors    An object keyed by the actor key whose values are the actor activity entities
 * @param  {Object}     aggregate.objects   An object keyed by the object key whose values are the object activity entities
 * @param  {Object}     aggregate.targets   An object keyed by the target key whose values are the target activity entities
 * @param  {String}     actorKey            The of the actor to see if it contributes a new entity
 * @param  {String}     objectKey           The of the object to see if it contributes a new entity
 * @param  {String}     targetKey           The of the target to see if it contributes a new entity
 * @return {Boolean}                        Whether or not any of the given actor, object or target keys contribute a new entity
 * @api private
 */
const _contributesNewEntity = function (aggregate, actorKey, objectKey, targetKey) {
  return !aggregate.actors[actorKey] || !aggregate.objects[objectKey] || !aggregate.targets[targetKey];
};

/**
 * Determines whether or not the given aggregate represents a single aggregate item. That is to say that there is at most one actor,
 * object and target associated to it.
 *
 * @param  {Object}     aggregate           The aggregate to check
 * @param  {Object}     aggregate.actors    An object keyed by the actor key whose values are the actor activity entities
 * @param  {Object}     aggregate.objects   An object keyed by the object key whose values are the object activity entities
 * @param  {Object}     aggregate.targets   An object keyed by the target key whose values are the target activity entities
 * @return {Boolean}                        Whether or not this aggregate represents a single activity
 * @api private
 */
const _isSingleAggregate = function (aggregate) {
  return (
    _.keys(aggregate.actors).length <= 1 &&
    _.keys(aggregate.objects).length <= 1 &&
    _.keys(aggregate.targets).length <= 1
  );
};

/**
 * Given an array of activity entities, return a new top-level activity entity representing how it should be modeled in an activity
 * stream.
 *
 * @param  {ActivityEntity[]}  entities        The activity entities to transform.
 * @return {ActivityEntity}                    An individual activity entity that represents the collection of entities.
 */
const createActivityEntity = function (entities) {
  if (!entities) {
    return undefined;
  }

  // eslint-disable-next-line unicorn/explicit-length-check
  if (!entities.length) {
    return undefined;
  }

  if (entities.length === 1) {
    return entities[0];
  }

  const ext = {};
  ext[ActivityConstants.properties.OAE_COLLECTION] = entities;
  return new ActivityEntity('collection', undefined, undefined, { ext });
};

/**
 * Flatten an aggregate into a string identity that allows us to determine if the activity that will be created by an aggregate
 * is identical to another. This can be used to maintain a hash of identities to quickly determine whether or not an activity
 * should be delivered.
 *
 * @param  {Object}    aggregate   The aggregate from which to deliver an activity identity
 * @return {String}                A string identity that can be used to determine if one activity is identical to another
 * @api private
 */
const _flattenActivity = function (aggregate) {
  const { route } = aggregate;
  const activityType = aggregate[ActivityConstants.properties.OAE_ACTIVITY_TYPE];

  // Create a multi-key of all the actors, objects and targets so they are deterministic
  const diff = (a, b) => a - b;
  const flatten = pipe(values, map(_createEntityKey), sort(diff), join(','));
  const actorsKeys = flatten(aggregate.actors);
  const objectsKeys = flatten(aggregate.objects);
  const targetsKeys = flatten(aggregate.targets);

  /**
   * Generate the identity key for the activity described by the aggregate. It looks like:
   * content-create#u:oae:mrvisser#user:u:oae:mrvisser#c:oae:jfEIop-,c:oae:PVOsdf43j##
   */
  return format('%s#%s#%s#%s#%s', activityType, route, actorsKeys, objectsKeys, targetsKeys);
};

/**
 * Create a unique string representation from the given entity. Looks something like: user:u:oae:mrvisser
 *
 * If the entity is not specified, returns `ENTITY_KEY_EMPTY` as a placeholder for the entity key.
 *
 * @param  {ActivityEntity}     entity  The entity for which to create an entity key.
 * @return {String}                     A unique string representation of the entity.
 */
const _createEntityKey = function (entity) {
  return entity ? format('%s:%s', entity.objectType, entity[ActivityConstants.properties.OAE_ID]) : ENTITY_KEY_EMPTY;
};

/**
 * Determine if the aggregate described by aggregateStatus is considered to be expired at the provided published date.
 *
 * An aggregate is expired when the following conditions hold true:
 *
 *  a) The last update that was made to the aggregate has been collected by a collection routine; and
 *  b) It has not been longer than the configured `aggregateIdleExpiry` seconds since the last activity matched the aggregate; and
 *  c) The aggregate has not been active for longer than the configured `aggregateMaxExpiry` time.
 *
 * @param  {Object}     aggregateStatus     The aggregate status entry.
 * @param  {Number}     published           The published date (in millis since the epoch) that the next activity occurred.
 * @param  {Boolean}                        Whether or not the aggregate is expired.
 */
const _isExpired = function (aggregateStatus, published) {
  const aggregateIdleExpiryInMs = ActivitySystemConfig.getConfig().aggregateIdleExpiry * 1000;
  const aggregateMaxExpiryInMs = ActivitySystemConfig.getConfig().aggregateMaxExpiry * 1000;

  const lastUpdateWasCollected =
    aggregateStatus.lastCollected && aggregateStatus.lastCollected > aggregateStatus.lastUpdated;
  const lastUpdateIsIdleExpired = published - aggregateStatus.lastUpdated > aggregateIdleExpiryInMs;
  const createdMaxIsExpired = published - aggregateStatus.created > aggregateMaxExpiryInMs;
  return lastUpdateWasCollected && (lastUpdateIsIdleExpired || createdMaxIsExpired);
};

/// /////////////////
// INTERNAL MODEL //
/// /////////////////

/**
 * A model object that represents the data associated to multiple activities for the same route aggregated together.
 *
 * @param  {String}     activityType    The type of the activities that were aggregated together
 * @param  {String}     route           The destination route for the activities that were aggregated together
 * @param  {String}     verb            The verb of the activities that were aggregated together
 * @param  {Number}     published       The latest timestamp (millis since the epoch) of the activities that were aggregated together
 * @api private
 */
const ActivityAggregate = function (activityType, route, verb, published) {
  const that = {};
  that[ActivityConstants.properties.OAE_ACTIVITY_TYPE] = activityType;
  that.route = route;
  that.verb = verb;
  that.published = published;
  that.activityIds = [];
  that.actors = {};
  that.objects = {};
  that.targets = {};

  /*!
   * Update the existing (if any) actor in the aggregate with the given actor. If the actor did not exist on the aggregate it will
   * be added.
   *
   * @param  {String}     actorKey    The unique key of the actor object
   * @param  {Object}     actor       The actor object to update
   */
  that.updateActor = function (actorKey, actor) {
    that.actors[actorKey] = actor;
  };

  /*!
   * Update the existing (if any) object in the aggregate with the given object. If the object did not exist on the aggregate it
   * will be added.
   *
   * @param  {String}     objectKey   The unique key of the object object
   * @param  {Object}     object      The object object to update
   */
  that.updateObject = function (objectKey, object) {
    that.objects[objectKey] = object;
  };

  /*!
   * Update the existing (if any) target in the aggregate with the given target. If the target did not exist on the aggregate it
   * will be added.
   *
   * @param  {String}     targetKey   The unique key of the target target
   * @param  {Object}     target      The target target to update
   */
  that.updateTarget = function (targetKey, target) {
    that.targets[targetKey] = target;
  };

  /*!
   * Add the given hash of actors to the given collection of actors. If any actors in the given set are already contained, they
   * are not added/updated to the current set of actors.
   *
   * @param  {Object}     actors  An object, keyed by the unique entity key, whose value is the actor to add to the current set of actors
   */
  that.addActors = function (actors) {
    if (actors) {
      that.actors = _.extend(actors, that.actors);
    }
  };

  /*!
   * Add the given hash of objects to the given collection of objects. If any objects in the given set are already contained, they
   * are not added/updated to the current set of objects.
   *
   * @param  {Object}     objects  An object, keyed by the unique entity key, whose value is the object to add to the current set of objects
   */
  that.addObjects = function (objects) {
    if (objects) {
      that.objects = _.extend(objects, that.objects);
    }
  };

  /*!
   * Add the given hash of targets to the given collection of targets. If any targets in the given set are already contained, they
   * are not added/updated to the current set of targets.
   *
   * @param  {Object}    targets  An object, keyed by the unique entity key, whose value is the target to add to the current set of targets
   */
  that.addTargets = function (targets) {
    if (targets) {
      that.targets = _.extend(targets, that.targets);
    }
  };

  return that;
};

export { resetAggregationForActivityStreams, collectAllBuckets, createAggregates, createActivityEntity };