packages/oae-activity/lib/internal/aggregator.js
/*!
* Copyright 2014 Apereo Foundation (AF) Licensed under the
* Educational Community License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://opensource.org/licenses/ECL-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
import { format } from 'node:util';
import _ from 'underscore';
import { equals, pipe, sort, map, join, values } from 'ramda';
import { logger } from 'oae-logger';
import { telemetry } from 'oae-telemetry';
import { Activity, ActivityEntity } from 'oae-activity/lib/model.js';
import { ActivityConstants } from 'oae-activity/lib/constants.js';
import * as ActivityUtil from 'oae-activity/lib/util.js';
import * as ActivityRegistry from './registry.js';
import * as ActivitySystemConfig from './config.js';
import ActivityEmitter from './emitter.js';
import * as ActivityDAO from './dao.js';
import * as ActivityBuckets from './buckets.js';
const log = logger('oae-activity-aggregator');
const isZero = equals(0);
const Telemetry = telemetry('activity');
// Used in an aggregate key to denote that there was no entity provided for an activity. This differs from an empty string in that
// an empty string is used when the aggregate does not pivot on that entity.
const ENTITY_KEY_EMPTY = '__null__';
/**
* Resets the aggregation process for a set of activity streams.
*
* @param {String[]} activityStreamIds The set of activity streams that need to have their aggregation reset
* @param {Function} callback Standard callback function
* @param {Object} callback.err An error that occurred, if any
*/
const resetAggregationForActivityStreams = function (activityStreamIds, callback) {
callback = callback || function () {};
ActivityDAO.resetAggregationForActivityStreams(activityStreamIds, (error) => {
if (error) {
log().error({ err: error, activityStreamIds }, 'Failed to reset aggregation for activity streams');
return callback(error);
}
ActivityEmitter.emit(ActivityConstants.events.RESET_AGGREGATION, activityStreamIds);
return callback();
});
};
/**
* Perform a full collection of all activity buckets. If any bucket is already locked by another process, it will be skipped. When
* this process completes and the callback is invoked, it will guarantee that:
*
* a) This process was not allowed to start another collection cycle, as there were too many occuring; or
* b) for every bucket that wasn't locked, it was collected until it was empty.
*
* This function is most useful for unit tests to ensure that all activities up until a point in time have been aggregated and delivered.
*
* @param {Function} [callback] Invoked when collection is complete
* @param {Object} [callback.err] An error that occurred, if any
*/
const collectAllBuckets = function (callback) {
const numberOfBuckets = ActivitySystemConfig.getConfig().numberOfProcessingBuckets;
const { maxConcurrentCollections } = ActivitySystemConfig.getConfig();
const { collectionExpiry } = ActivitySystemConfig.getConfig();
ActivityBuckets.collectAllBuckets(
'activity',
numberOfBuckets,
maxConcurrentCollections,
collectionExpiry,
_collectBucket,
callback
);
};
/**
* Collect and process a certain amount of routed activities from the given bucket. This method is *not safe* in the sense
* that it does not try and first acquire a lock on the bucket. Do not use this directly, instead use `collectBucket` which
* in turn uses this method with locks.
*
* Collecting and processing activities in this method goes through the following steps:
*
* 1. Get the next batch of queued activities in the bucket. The number of activities processed is determined by `limit`.
*
* 2. Expand the activities retrieved into all the potential "aggregate keys" that it could match. When expanding, a
* preliminary process of aggregation occurs where we collect aggregates within just the set of activities we
* fetched from the queue. See `createAggregates` for more information.
*
* 3. Get the status of all aggregate keys we expanded. This helps us identify which aggregates are "active" (i.e., have
* received matching activities and have not expired). It also helps us identify expired aggregates, in which case we
* can delete all their aggregated data (status and entities).
*
* 4. For all expired aggregates, delete their status entry and all their previously aggregated entities. For more information
* on expiry, see `_isExpired`.
*
* 5. For all *active* aggregates, fetch all the aggregated entities that are stored for them. This ensures that the activities
* current history can carry forward when this aggregate is redelivered.
*
* 6. Merge all the aggregates with their aggregated entities fetched in step #5, and identify which aggregates will be
* delivered as activities. Aggregates that will be delivered are determined by:
*
* * Identify which activities belong to "multi-aggregates". A multi-aggregate is an aggregate that has aggregated atleast
* two distinct activities. Merge the data and mark it to be delivered (i.e., the activity becomes "claimed"). In this
* case, the `lastActivity` id of the aggregate status is recorded so it may be deleted (because it was replaced with the
* updated aggregate)
* * For all activities that weren't claimed by a multi-aggregate, identify the ones that belong to a "single-aggregate". A
* single-aggregate is an aggregate that only contains a single activity of data. The single-aggregate then becomes merged
* with the new activity. Merging the new activity with a single-aggregate will now make it a multi-aggregate if the new
* activity contains at least one entity that did not belong to the single-aggregate. The previous activity is deleted
* in the same way as with multi-aggregates
* * Identify which activities do not belong to any active aggregate (no single- or multi-aggregate exists for it) and mark
* them to be delivered, but only once per route. This new aggregate is now a "single-aggregate" which can now claim a
* matching activity at the single-aggregate priority if a matching activity occurs within the expiry time
*
* 7. Save all the new aggregate entities that were found in the collected batch of queued activities. This ensures that when
* the next activity comes along and matches those aggregates, that historical information is there to carry forward.
*
* 8. Iterate over all the aggregates that are marked for delivery, and generate the activity object based on their aggregated
* entities and metadata (publish date, verb, activityType, etc...)
*
* 9. Persist the new activities to their routes
*
* 10. Delete all the activities that were being tracked as active aggregates. We recorded which activities need to be deleted
* during step #6
*
* 11. Update the status of all aggregates. At the very least to indicate they were just touched by an activity. For aggregates that
* actually resulted in a delivered activity, they will also be given a `lastActivity` id that will be used to delete the
* activity we just delivered if a new activity matches the aggregate
*
* 12. Delete the queued activities that we just processed so that they don't get reprocessed.
*
* @param {Number} bucketNumber The bucket to process.
* @param {Function} callback Standard callback function
* @param {Object} callback.err An error that occurred, if any
* @api private
*/
const _collectBucket = function (bucketNumber, callback) {
const collectionStart = Date.now();
const limit = ActivitySystemConfig.getConfig().collectionBatchSize;
log().trace('Collecting batch of %s entries from bucket number %s.', limit, bucketNumber);
// Step #1: Get the next batch of queued activities to process
ActivityDAO.getQueuedActivities(bucketNumber, limit, (error, routedActivities, numberToDelete) => {
if (error) return callback(error);
// No more to process, so stop and report that we're empty
if (isZero(numberToDelete)) return callback(null, true);
/**
* Step #2: Delete the queued activities that we're processing. We do
* this right away because if there is any bad data, we want to make
* sure that the existence of that data does not permanently inhibit
* future collection rounds, so we need to ensure we pull that data out
* regardless if aggregation is successful
*/
ActivityDAO.deleteQueuedActivities(bucketNumber, numberToDelete, (error_) => {
if (error_) return callback(error_);
// Step #3: Explode the routed activities into their potential aggregates, according to their configured pivot points
const allAggregates = createAggregates(_.values(routedActivities));
const allAggregateKeys = _.keys(allAggregates);
// Step #4: Get all aggregate statuses to determine which ones are expired and which are active. Expired aggregates
// should be deleted, while active aggregates should be merged and redelivered
ActivityDAO.getAggregateStatus(allAggregateKeys, (error, statusByAggregateKey) => {
if (error) return callback(error);
// Figure out which aggregates are "active" (have an activity in its aggregate) and "expired" (no new activities in the aggregate before expiry time)
const activeAggregates = {};
const expiredAggregates = {};
_.each(allAggregateKeys, (aggregateKey) => {
const status = statusByAggregateKey[aggregateKey];
if (status && _isExpired(status, allAggregates[aggregateKey].published)) {
expiredAggregates[aggregateKey] = true;
} else if (status) {
activeAggregates[aggregateKey] = true;
}
});
// Note: We need to delete aggregated entities and save them here within the collection chain to avoid nuking undelivered
// entities. If we saved aggregated entities during the routing phase and only deleted them here, it would save us a write
// as we wouldn't have to write them to the queue, but it exposes a race condition where entities that are saved between
// getAggregateStatus (above) and deleteAggregateData (below) will be deleted before delivery.
// Step #5: Delete all the expired aggregates before aggregating new stuff
ActivityDAO.deleteAggregateData(_.keys(expiredAggregates), (error_) => {
if (error_) {
return callback(error_);
}
// Step #6: Retrieve all entities that are aggregated within the active aggregates so they can be collected into redelivered activities
ActivityDAO.getAggregatedEntities(_.keys(activeAggregates), (error, fetchedEntities) => {
if (error) return callback(error);
/*!
* Step #7:
*
* Here we choose which aggregates need to be wrapped up into an activity and delivered to the activity stream. This is
* rather difficult to get right. These are the rules implemented below:
*
* For a given activity:
*
* 1. If a matching multi-aggregate exists for an activity, it will "claim" the activity for that stream and redeliver
* the updated aggregate, while deleting the old version of the activity. If more than one multi-aggregate matches
* the activity, all matching multi-aggregates are redelivered. This is to support the situation where:
*
* Aggregate #1: "Branden followed Simon and Bert"
* Aggregate #2: "Nicolaas and Stuart followed Stephen"
*
* Now, when Branden follows Stephen, both of those "multi-aggregates" will claim this activity, as such:
*
* Aggregate #1': "Branden followed Simon, Bert and Stephen"
* Aggregate #2': "Nicolaas, Stuart and Branden followed Stephen"
*
* 2. For all activities that haven't been claimed, if a single-aggregate exists for an activity, it will "claim" the
* activity for that stream and redeliver the updated aggregate, while deleting the old version of the activity. If
* more than one single-aggregate matches the activity, all matching single-aggregates are redelivered. This is to
* support the situation where:
*
* Aggregate #1: "Branden followed Simon"
* Aggregate #2: "Nicolaas followed Stephen"
*
* Now, when Branden followed Stephen, both of those "single-aggregates" will claim this activity and become "multi-
* aggregates", as such:
*
* Aggregate #1: "Branden followed Simon and Stephen"
* Aggregate #2: "Nicolaas and Branden followed Stephen"
*
* 3. An activity is only delivered for an inactive aggregate if the activity was not "claimed" for the route by an
* active single- or multi-aggregate. This would make sure that we don't redeliver an active aggregate, AND deliver
* a new single-aggregate (e.g., "Branden shared Syllabus with OAE Team") for the same route.
*
* 4. If no active aggregates claim an activity, and there are multiple inactive aggregates (e.g., the activity type has
* multiple "pivot points"), then one single activity is delivered for all of them. This is necessary to ensure that
* the "lastActivityId" is recorded properly for both aggregates, so if either of those inactive aggregates become
* active later (i.e., another activity comes along and matches it), the previous activity can be properly deleted by
* either of the aggregates.
*
* FIXMEMAYBE: https://github.com/oaeproject/Hilary/pull/650#issuecomment-23865585
*
*/
// Keeps track of all aggregate keys that should actually be delivered. Not all potential aggregates get delivered
// because other aggregates may take priority or they may be duplicates of existing activities
const aggregatesToDeliver = {};
// When a new activity is delivered that aggregates with an existing activity, the existing activity gets deleted
// and replaced with a newer aggregate that represents them both. This hash keeps track of each activity that
// should be deleted for replacement
const activitiesToDelete = {};
// Keeps track of which activities have already been "claimed" by an aggregate so that an activity doesn't get
// delivered twice
const claimedRouteActivities = {};
// Keeps track of how many *new* activities have been delivered to a route. Multple activities that aggregate into
// the same activity are considered "new" activities in a route
const numberNewActivitiesByRoute = {};
// First, give an opportunity for all active "multi-aggregates" to claim the routed activity. That is to say
// aggregates who actually have aggregated two or more activities. This indicates they would have clobbered any
// "single-aggregates" that are their predecessors
_.each(allAggregateKeys, (aggregateKey) => {
const aggregate = allAggregates[aggregateKey];
const isMultiAggregateInQueue = aggregate.activityIds.length > 1;
const isMultiAggregateInFeed =
fetchedEntities[aggregateKey] && !_isSingleAggregate(fetchedEntities[aggregateKey]);
// If the activity is already a multi-aggregate in the routed activity queue, or it is a multi-aggregate living in
// the aggregates cache, we will claim them here with top priority
if (isMultiAggregateInQueue || isMultiAggregateInFeed) {
const status = statusByAggregateKey[aggregateKey];
// Mark this to be delivered and assign it an activity id
aggregatesToDeliver[aggregateKey] = true;
aggregate[ActivityConstants.properties.OAE_ACTIVITY_ID] = ActivityDAO.createActivityId(
aggregate.published
);
// Mark these activities for this route as being claimed by an active aggregate
claimedRouteActivities[aggregate.route] = claimedRouteActivities[aggregate.route] || {};
_.each(aggregate.activityIds, (activityId) => {
claimedRouteActivities[aggregate.route][activityId] = true;
});
if (status && status.lastActivity) {
// If this was previously delivered, delete the previous activity
activitiesToDelete[aggregate.route] = activitiesToDelete[aggregate.route] || {};
activitiesToDelete[aggregate.route][status.lastActivity] = true;
} else if (isMultiAggregateInQueue && !isMultiAggregateInFeed) {
// If this aggregate is aggregating with an activity in the queue (=in-memory aggregation)
// but NOT with activities already delivered to the feed, it means multiple activities
// were launched in quick successesion (content-create for example) that could be aggregated
// into one single activity. This increments the number of new activities for this route by 1
numberNewActivitiesByRoute[aggregate.route] = numberNewActivitiesByRoute[aggregate.route] || 0;
numberNewActivitiesByRoute[aggregate.route]++;
}
}
});
// Second, give an opportunity for all active "single-aggregates" to claim the routed activity. That is to say
// aggregates who are actually just a single activity that has happened, and no other activities have "joined"
// them yet
_.each(allAggregateKeys, (aggregateKey) => {
const aggregate = allAggregates[aggregateKey];
// We know this activity only has 1 activity id now, because all aggregates with multiple activity ids would
// have already been claimed as multi-aggregates
const activityId = aggregate.activityIds[0];
const isClaimed =
claimedRouteActivities[aggregate.route] && claimedRouteActivities[aggregate.route][activityId];
if (!isClaimed && activeAggregates[aggregateKey]) {
const status = statusByAggregateKey[aggregateKey];
// Mark this to be delivered and assign it an activity id
aggregatesToDeliver[aggregateKey] = true;
aggregate[ActivityConstants.properties.OAE_ACTIVITY_ID] = ActivityDAO.createActivityId(
aggregate.published
);
// Mark these activities for this route as being claimed by an active aggregate
claimedRouteActivities[aggregate.route] = claimedRouteActivities[aggregate.route] || {};
_.each(aggregate.activityIds, (activityId) => {
claimedRouteActivities[aggregate.route][activityId] = true;
});
if (status && status.lastActivity) {
// If this was previously delivered, delete the previous activity
activitiesToDelete[aggregate.route] = activitiesToDelete[aggregate.route] || {};
activitiesToDelete[aggregate.route][status.lastActivity] = true;
}
}
});
// Lastly, for aggregates that are not even active (i.e., they are brand new aggregates, no match on
// any recent activities), determine if they can be delivered
const incrementedForActivities = {};
_.each(allAggregateKeys, (aggregateKey) => {
const aggregate = allAggregates[aggregateKey];
// We know this activity only has 1 activity id now, because all aggregates with multiple activity ids would
// have already been claimed as multi-aggregates
const activityId = aggregate.activityIds[0];
const isClaimed =
claimedRouteActivities[aggregate.route] && claimedRouteActivities[aggregate.route][activityId];
if (!isClaimed) {
// If this route has not received an aggregate, then we deliver the non-active one(s). In the event that
// there are multiple non-active aggregates, a duplicate activity will not be fired because we flatten and
// maintain a set while generating activities later.
aggregatesToDeliver[aggregateKey] = true;
aggregate[ActivityConstants.properties.OAE_ACTIVITY_ID] = activityId;
// When delivering single non-active aggregates, it's possible that we might deliver 2 aggregates to the
// same route. To ensure that we do not increment the count more than once for an activity, we flatten
// the aggregate into a unique string that identifies the activity it represents. This way we can keep
// track of whether an activity already incremented the notification count
const flattenedActivity = _flattenActivity(aggregate);
if (!incrementedForActivities[flattenedActivity]) {
numberNewActivitiesByRoute[aggregate.route] = numberNewActivitiesByRoute[aggregate.route] || 0;
numberNewActivitiesByRoute[aggregate.route]++;
incrementedForActivities[flattenedActivity] = true;
}
}
});
// Step #8: Save the aggregated entities stored in the current batch of aggregates
ActivityDAO.saveAggregatedEntities(allAggregates, (error__) => {
if (error__) {
return callback(error__);
}
// Step #9: Create the actual activities to route
let numberDelivered = 0;
const visitedActivities = {};
const activityStreamUpdates = {};
_.each(aggregatesToDeliver, (aggregateToDeliver, aggregateKey) => {
const aggregate = allAggregates[aggregateKey];
// Construct the activities to deliver
const activityType = aggregate[ActivityConstants.properties.OAE_ACTIVITY_TYPE];
const { published, verb } = aggregate;
// Refresh the entities with the freshly fetched set, which has all the entities, not those just in this collection
// We need to make sure we override with the queued entities and not the freshly fetched ones since they may have been
// updated since original aggregation.
if (fetchedEntities[aggregateKey]) {
aggregate.addActors(fetchedEntities[aggregateKey].actors);
aggregate.addObjects(fetchedEntities[aggregateKey].objects);
aggregate.addTargets(fetchedEntities[aggregateKey].targets);
}
// Make sure that we don't deliver an identical activity to the same stream twice. This can potentially
// happen when an activity type has multiple pivots that were inactive prior to this activity (e.g., content-share)
let activityId = null;
const flattenedActivity = _flattenActivity(aggregate);
if (visitedActivities[flattenedActivity]) {
// We assign the previous activity id to the aggregate so that we can update the aggregate status to know that
// any new activities for this aggregate should replace its existing activity
aggregate[ActivityConstants.properties.OAE_ACTIVITY_ID] = visitedActivities[flattenedActivity];
return;
}
// This activity is not a duplicate, assign and record a new activityId
activityId = ActivityDAO.createActivityId(aggregate.published);
aggregate[ActivityConstants.properties.OAE_ACTIVITY_ID] = activityId;
visitedActivities[flattenedActivity] = activityId;
// Create the entities for the delivered activity
const actor = createActivityEntity(_.values(aggregate.actors));
const object = createActivityEntity(_.values(aggregate.objects));
const target = createActivityEntity(_.values(aggregate.targets));
activityStreamUpdates[aggregate.route] = activityStreamUpdates[aggregate.route] || {};
activityStreamUpdates[aggregate.route][activityId] = new Activity(
activityType,
activityId,
verb,
published,
actor,
object,
target
);
numberDelivered++;
});
// Step #10: Deliver the new activities to the streams
ActivityDAO.deliverActivities(activityStreamUpdates, (error___) => {
if (error___) {
return callback(error___);
}
// Collection date is marked as the date/time that the aggregate gets delivered
const collectionDate = Date.now();
// Record how long it took for these to be delivered
_.each(activityStreamUpdates, (routedActivities) => {
_.each(routedActivities, (activity) => {
Telemetry.appendDuration('delivery.time', activity.published);
});
});
// The activitiesToDelete hash values should actually be arrays of unique activity ids, not "<activity id>: true" pairs.
_.each(activitiesToDelete, (activityToDelete, route) => {
activitiesToDelete[route] = _.keys(activitiesToDelete[route]);
});
// Step #11: Delete the old activities that were replaced by aggregates
ActivityDAO.deleteActivities(activitiesToDelete, (error___) => {
if (error___) {
return callback(error___);
}
// Determine how to update all the aggregate statuses
const statusUpdatesByActivityStreamId = {};
_.each(allAggregateKeys, (aggregateKey) => {
const aggregate = allAggregates[aggregateKey];
statusUpdatesByActivityStreamId[aggregate.route] =
statusUpdatesByActivityStreamId[aggregate.route] || {};
statusUpdatesByActivityStreamId[aggregate.route][aggregateKey] = {
lastUpdated: aggregate.published,
lastCollected: collectionDate
};
if (!activeAggregates[aggregateKey]) {
// This aggregate was not previously active, so mark its creation date at the beginning of the first activity
statusUpdatesByActivityStreamId[aggregate.route][aggregateKey].created = aggregate.published;
}
// Mark the last activity for each aggregate. This ensures that when a new activity gets added to the aggregate, we can
// delete the previous one.
if (aggregate[ActivityConstants.properties.OAE_ACTIVITY_ID]) {
statusUpdatesByActivityStreamId[aggregate.route][aggregateKey].lastActivity =
aggregate[ActivityConstants.properties.OAE_ACTIVITY_ID];
}
});
// Step #12: Update the activity statuses, indicating they have just been updated and collected, where applicable
ActivityDAO.indexAggregateData(statusUpdatesByActivityStreamId, (error___) => {
if (error___) {
return callback(error___);
}
// Fire an event that we have successfully delivered these individual activities
const deliveredActivityInfos = {};
_.each(routedActivities, (routedActivity) => {
const activityStream = ActivityUtil.parseActivityStreamId(routedActivity.route);
const { streamType, resourceId } = activityStream;
deliveredActivityInfos[resourceId] = deliveredActivityInfos[resourceId] || {};
deliveredActivityInfos[resourceId][streamType] = deliveredActivityInfos[resourceId][
streamType
] || {
numNewActivities: numberNewActivitiesByRoute[routedActivity.route] || 0,
activities: []
};
deliveredActivityInfos[resourceId][streamType].activities.push(routedActivity.activity);
});
if (!_.isEmpty(deliveredActivityInfos)) {
ActivityEmitter.emit(ActivityConstants.events.DELIVERED_ACTIVITIES, deliveredActivityInfos);
}
Telemetry.appendDuration('collection.time', collectionStart);
Telemetry.incr('collected.count', _.size(routedActivities));
Telemetry.incr('delivered.count', numberDelivered);
return callback();
});
});
});
});
});
});
});
});
});
};
/**
* Explode the given routed activities into all potential aggregates. An aggregate is a permutation of a routed activity that
* further keys each by the pivot points by which the activity can be aggregated over a period of time.
*
* The routed activities are an array of following form:
*
* [
* {
* 'route': <route>,
* 'activity': <Activity>
* },
* { ... }
* ]
*
* Where the route specifies the route to which the activity should be delivered, and the activity is the activity to deliver.
*
* The result will be an object representing the aggregation of all the routed activities in the list, keyed by the aggregate key. An
* example aggregation of an activity that pivots on actor and had 3 matching aggregates in the array would be:
*
* {
* '<aggregateKey>': {
* 'route': '...',
* 'oae:activityType': '...',
* 'activityIds': [ '<activityId0>', ... ],
* 'verb': '...',
* 'published': '...',
* 'actors': {
* '<actorKey0>': <ActivityEntity (actor)>
* },
* 'objects': {
* '<objectKey0>': <ActivityEntity (object)>,
* '<objectKey1>': <ActivityEntity (object)>,
* '<objectKey2>': <ActivityEntity (object)>
* },
* 'targets': {}
* }
* }
*
* @see #ActivityAggregate model object for more details
*
* @param {Object[]} routedActivities An array of activities along with the route to which they should be delivered. See summary for more information
* @return {Object} An object representing the potential aggregates of the collected batch of activities
*/
const createAggregates = function (routedActivities) {
const aggregates = {};
_.each(routedActivities, (routedActivity) => {
// A routedActivity could be null if the contents failed to parse (corrupt?). Just skip over it.
if (!routedActivity) {
return;
}
const { activity, route } = routedActivity;
const activityType = activity[ActivityConstants.properties.OAE_ACTIVITY_TYPE];
const activityId = activity[ActivityConstants.properties.OAE_ACTIVITY_ID];
// Build the entity keys which will be used to create the aggregate key
const actorKey = _createEntityKey(activity.actor);
const objectKey = _createEntityKey(activity.object);
const targetKey = _createEntityKey(activity.target);
// Determine how this activity will be grouped (a.k.a., pivot points) for aggregation
const activityTypes = ActivityRegistry.getRegisteredActivityTypes();
let groupBy = activityTypes[activityType] ? activityTypes[activityType].groupBy : [];
// Ensure we atleast have the "all" aggregate, which means we don't get duplicate activities within the same aggregation
// period
if (_.isEmpty(groupBy)) {
groupBy = [
{
actor: true,
object: true,
target: true
}
];
}
// For each potential grouping, create an "aggregate key", which will be used to determine if new activity deliveries
// match with the same key
_.each(groupBy, (pivot) => {
const pivotActorKey = _createPivotKey(activity.actor, pivot.actor);
const pivotObjectKey = _createPivotKey(activity.object, pivot.object);
const pivotTargetKey = _createPivotKey(activity.target, pivot.target);
// The aggregate key is of the following format: "content-create#u:oae:mrvisser#user:u:oae:mrvisser##
const aggregateKey = format('%s#%s#%s#%s#%s', activityType, route, pivotActorKey, pivotObjectKey, pivotTargetKey);
// This process of collecting actors, objects, targets and activities is in some respect "in-memory aggregation". It
// helps to use this to determine ahead of time if there are a few activities within this batch of routed activities
// that already match. It helps us deliver an aggregate right away to the route, rather than accidentally delivering
// individual activities from within a batch
if (!aggregates[aggregateKey]) {
aggregates[aggregateKey] = new ActivityAggregate(activityType, route, activity.verb, activity.published);
}
const aggregate = aggregates[aggregateKey];
// Below, we suppress the aggregate only if it does not contribute any new entity to an existing aggregate. This
// allows us to avoid aggregating exact duplicates from within the batch and making it look like it is a live
// aggregate
const hasNew = _contributesNewEntity(aggregate, actorKey, objectKey, targetKey);
if (activity.actor) {
aggregate.updateActor(actorKey, activity.actor);
}
if (activity.object) {
aggregate.updateObject(objectKey, activity.object);
}
if (activity.target) {
aggregate.updateTarget(targetKey, activity.target);
}
// Ensure we record the most recent occurance of an activity
aggregate.published = activity.published;
// Only make this aggregate look like an active aggregate if a second one actually contributed a new entity
if (hasNew) {
aggregate.activityIds.push(activityId);
}
});
});
return aggregates;
};
/**
* Create the pivot key for the given entity according to the specified pivot spec (e.g., groupBy)
*
* @param {ActivityEntity} entity The entity for which to create the pivot key
* @param {Boolean|String|Function} [pivotSpec] The pivot spec in the activity registry of this activity which describes how to pivot
* @return {String} The pivot key that tells the aggregator how to aggregate on activities that contain this entity
* @api private
*/
const _createPivotKey = function (entity, pivotSpec) {
let key = '';
if (pivotSpec) {
if (pivotSpec === true) {
// When the pivot spec for an entity is true, we default to the identity of the entity
// which is what people want most of the time
key = _createEntityKey(entity);
} else if (_.isString(pivotSpec)) {
// If we get a string, we treat it as the key of the entity to aggregate on. This is
// useful to collect entities in different ways rather than pivoting on a unique entity
// (e.g., we can collect on resource type, or on visibility, etc...)
key = entity[pivotSpec];
} else if (_.isFunction(pivotSpec)) {
// When given a function for the pivot spec, we give full control for the caller to
// determine the pivot key based on the entity
key = pivotSpec(entity);
}
}
if (!_.isString(key)) {
log().warn(
{
entity,
key,
pivotSpec
},
'Entity resulted in non-string pivot key'
);
return '';
}
return key;
};
/**
* Determines whether or not the given actor, object and target keys represent contribution of any new entities to this aggregate.
*
* @param {Object} aggregate The aggregate to check
* @param {Object} aggregate.actors An object keyed by the actor key whose values are the actor activity entities
* @param {Object} aggregate.objects An object keyed by the object key whose values are the object activity entities
* @param {Object} aggregate.targets An object keyed by the target key whose values are the target activity entities
* @param {String} actorKey The of the actor to see if it contributes a new entity
* @param {String} objectKey The of the object to see if it contributes a new entity
* @param {String} targetKey The of the target to see if it contributes a new entity
* @return {Boolean} Whether or not any of the given actor, object or target keys contribute a new entity
* @api private
*/
const _contributesNewEntity = function (aggregate, actorKey, objectKey, targetKey) {
return !aggregate.actors[actorKey] || !aggregate.objects[objectKey] || !aggregate.targets[targetKey];
};
/**
* Determines whether or not the given aggregate represents a single aggregate item. That is to say that there is at most one actor,
* object and target associated to it.
*
* @param {Object} aggregate The aggregate to check
* @param {Object} aggregate.actors An object keyed by the actor key whose values are the actor activity entities
* @param {Object} aggregate.objects An object keyed by the object key whose values are the object activity entities
* @param {Object} aggregate.targets An object keyed by the target key whose values are the target activity entities
* @return {Boolean} Whether or not this aggregate represents a single activity
* @api private
*/
const _isSingleAggregate = function (aggregate) {
return (
_.keys(aggregate.actors).length <= 1 &&
_.keys(aggregate.objects).length <= 1 &&
_.keys(aggregate.targets).length <= 1
);
};
/**
* Given an array of activity entities, return a new top-level activity entity representing how it should be modeled in an activity
* stream.
*
* @param {ActivityEntity[]} entities The activity entities to transform.
* @return {ActivityEntity} An individual activity entity that represents the collection of entities.
*/
const createActivityEntity = function (entities) {
if (!entities) {
return undefined;
}
// eslint-disable-next-line unicorn/explicit-length-check
if (!entities.length) {
return undefined;
}
if (entities.length === 1) {
return entities[0];
}
const ext = {};
ext[ActivityConstants.properties.OAE_COLLECTION] = entities;
return new ActivityEntity('collection', undefined, undefined, { ext });
};
/**
* Flatten an aggregate into a string identity that allows us to determine if the activity that will be created by an aggregate
* is identical to another. This can be used to maintain a hash of identities to quickly determine whether or not an activity
* should be delivered.
*
* @param {Object} aggregate The aggregate from which to deliver an activity identity
* @return {String} A string identity that can be used to determine if one activity is identical to another
* @api private
*/
const _flattenActivity = function (aggregate) {
const { route } = aggregate;
const activityType = aggregate[ActivityConstants.properties.OAE_ACTIVITY_TYPE];
// Create a multi-key of all the actors, objects and targets so they are deterministic
const diff = (a, b) => a - b;
const flatten = pipe(values, map(_createEntityKey), sort(diff), join(','));
const actorsKeys = flatten(aggregate.actors);
const objectsKeys = flatten(aggregate.objects);
const targetsKeys = flatten(aggregate.targets);
/**
* Generate the identity key for the activity described by the aggregate. It looks like:
* content-create#u:oae:mrvisser#user:u:oae:mrvisser#c:oae:jfEIop-,c:oae:PVOsdf43j##
*/
return format('%s#%s#%s#%s#%s', activityType, route, actorsKeys, objectsKeys, targetsKeys);
};
/**
* Create a unique string representation from the given entity. Looks something like: user:u:oae:mrvisser
*
* If the entity is not specified, returns `ENTITY_KEY_EMPTY` as a placeholder for the entity key.
*
* @param {ActivityEntity} entity The entity for which to create an entity key.
* @return {String} A unique string representation of the entity.
*/
const _createEntityKey = function (entity) {
return entity ? format('%s:%s', entity.objectType, entity[ActivityConstants.properties.OAE_ID]) : ENTITY_KEY_EMPTY;
};
/**
* Determine if the aggregate described by aggregateStatus is considered to be expired at the provided published date.
*
* An aggregate is expired when the following conditions hold true:
*
* a) The last update that was made to the aggregate has been collected by a collection routine; and
* b) It has not been longer than the configured `aggregateIdleExpiry` seconds since the last activity matched the aggregate; and
* c) The aggregate has not been active for longer than the configured `aggregateMaxExpiry` time.
*
* @param {Object} aggregateStatus The aggregate status entry.
* @param {Number} published The published date (in millis since the epoch) that the next activity occurred.
* @param {Boolean} Whether or not the aggregate is expired.
*/
const _isExpired = function (aggregateStatus, published) {
const aggregateIdleExpiryInMs = ActivitySystemConfig.getConfig().aggregateIdleExpiry * 1000;
const aggregateMaxExpiryInMs = ActivitySystemConfig.getConfig().aggregateMaxExpiry * 1000;
const lastUpdateWasCollected =
aggregateStatus.lastCollected && aggregateStatus.lastCollected > aggregateStatus.lastUpdated;
const lastUpdateIsIdleExpired = published - aggregateStatus.lastUpdated > aggregateIdleExpiryInMs;
const createdMaxIsExpired = published - aggregateStatus.created > aggregateMaxExpiryInMs;
return lastUpdateWasCollected && (lastUpdateIsIdleExpired || createdMaxIsExpired);
};
/// /////////////////
// INTERNAL MODEL //
/// /////////////////
/**
* A model object that represents the data associated to multiple activities for the same route aggregated together.
*
* @param {String} activityType The type of the activities that were aggregated together
* @param {String} route The destination route for the activities that were aggregated together
* @param {String} verb The verb of the activities that were aggregated together
* @param {Number} published The latest timestamp (millis since the epoch) of the activities that were aggregated together
* @api private
*/
const ActivityAggregate = function (activityType, route, verb, published) {
const that = {};
that[ActivityConstants.properties.OAE_ACTIVITY_TYPE] = activityType;
that.route = route;
that.verb = verb;
that.published = published;
that.activityIds = [];
that.actors = {};
that.objects = {};
that.targets = {};
/*!
* Update the existing (if any) actor in the aggregate with the given actor. If the actor did not exist on the aggregate it will
* be added.
*
* @param {String} actorKey The unique key of the actor object
* @param {Object} actor The actor object to update
*/
that.updateActor = function (actorKey, actor) {
that.actors[actorKey] = actor;
};
/*!
* Update the existing (if any) object in the aggregate with the given object. If the object did not exist on the aggregate it
* will be added.
*
* @param {String} objectKey The unique key of the object object
* @param {Object} object The object object to update
*/
that.updateObject = function (objectKey, object) {
that.objects[objectKey] = object;
};
/*!
* Update the existing (if any) target in the aggregate with the given target. If the target did not exist on the aggregate it
* will be added.
*
* @param {String} targetKey The unique key of the target target
* @param {Object} target The target target to update
*/
that.updateTarget = function (targetKey, target) {
that.targets[targetKey] = target;
};
/*!
* Add the given hash of actors to the given collection of actors. If any actors in the given set are already contained, they
* are not added/updated to the current set of actors.
*
* @param {Object} actors An object, keyed by the unique entity key, whose value is the actor to add to the current set of actors
*/
that.addActors = function (actors) {
if (actors) {
that.actors = _.extend(actors, that.actors);
}
};
/*!
* Add the given hash of objects to the given collection of objects. If any objects in the given set are already contained, they
* are not added/updated to the current set of objects.
*
* @param {Object} objects An object, keyed by the unique entity key, whose value is the object to add to the current set of objects
*/
that.addObjects = function (objects) {
if (objects) {
that.objects = _.extend(objects, that.objects);
}
};
/*!
* Add the given hash of targets to the given collection of targets. If any targets in the given set are already contained, they
* are not added/updated to the current set of targets.
*
* @param {Object} targets An object, keyed by the unique entity key, whose value is the target to add to the current set of targets
*/
that.addTargets = function (targets) {
if (targets) {
that.targets = _.extend(targets, that.targets);
}
};
return that;
};
export { resetAggregationForActivityStreams, collectAllBuckets, createAggregates, createActivityEntity };