hackedteam/rcs-db

View on GitHub
lib/rcs-db/db_objects/aggregate.rb

Summary

Maintainability
A
3 hrs
Test Coverage
require 'mongoid'
require 'set'
require 'rcs-common/trace'

require_relative '../position/proximity'
require_relative '../target_scoped'

class Aggregate
  include Mongoid::Document
  include RCS::TargetScoped
  include RCS::Tracer
  include RCS::DB::Proximity
  extend RCS::Tracer

  field :aid, type: String                      # agent BSON_ID
  field :day, type: String                      # day of aggregation
  field :type, type: Symbol
  field :count, type: Integer, default: 0
  field :size, type: Integer, default: 0        # seconds for calls, bytes for the others
  field :info, type: Array, default: []         # for summary or timeframe (position)
  field :data, type: Hash, default: {}

  index({aid: 1}, {background: true})
  index({type: 1}, {background: true})
  index({day: 1}, {background: true})
  index({"data.peer" => 1}, {background: true})
  index({"data.type" => 1}, {background: true})
  index({"data.host" => 1}, {background: true})
  index({type: 1, "data.peer" => 1 }, {background: true})
  index({'data.position' => "2dsphere"}, {background: true})

  shard_key :type, :day, :aid

  scope :positions, where(type: :position)

  # The "day" attribute must be a string in the format of YYYYMMDD
  # or the string "0" (when the type if :postioner or :summary)
  validates_format_of :day, :with => /\A(\d{8}|0)\z/
  # Valalidates the presence of the shard key attributes
  validates_presence_of :type, :day, :aid

  def to_point
    raise "not a position" unless type.eql? :position
    time_params = (info.last.symbolize_keys rescue nil) || {}
    Point.new time_params.merge(lat: data['position'][1], lon: data['position'][0], r: data['radius'])
  end

  def position
    {latitude: self.data['position'][1], longitude: self.data['position'][0], radius: self.data['radius']}
  end

  def self.aggregate_type_to_handle_type(aggregate_type)
    t = aggregate_type.to_sym

    if [:phone, :sms, :mms].include?(t)
      'phone'
    elsif [:mail, :gmail, :outlook].include?(t)
      'mail'
    else
      "#{t}"
    end
  end

  def entity_handle_type
     Aggregate.aggregate_type_to_handle_type(type)
  end

  def target_id
    # The class method #target_id is added by TargetScoped
    self.class.target_id
  end

  def add_to_intelligence_queue
    IntelligenceQueue.add(target_id, id, :aggregate)
  end

  def self.create_collection
    # ensure indexes
    create_indexes
    # enable sharding only if not enabled
    RCS::DB::Shard.set_key(collection, {type: 1, day: 1, aid: 1}) unless RCS::DB::Shard.sharded?(collection)
  end

  def self.versus_of_communications_with(handle)
    trace :debug, "Determining communication versus between target #{@target_id} and the handle #{handle.handle.inspect} (#{handle.type.inspect})"

    directions = self.in(:type => handle.aggregate_types).where('data.peer' => handle.handle).only('data.versus').distinct('data.versus')

    if directions.size == 1
      directions.first.to_sym
    elsif directions.size == 2
      :both
    else
      nil
    end
  end

  # Extracts the most visited urls for a given target (within a timeframe).
  # Params accepted are "from", "to" (in the form of yyyymmdd strings) and "limit" (integer).
  # @example Aggregate.most_visited_urls(target._id, 'from' => '20130103', 'to' => '20140502').
  def self.most_visited_urls(target_id, params = {})
    match = {:type => :url}
    match[:day] = {'$gte' => params['from'], '$lte' => params['to']} if params['from'] and params['to']
    limit = params['num'] || 5
    group = {_id: "$data.host", count: {"$sum" => "$count"}}

    pipeline = [{"$match" => match}, {"$group" => group}, {"$sort" => {count: -1}}, {"$limit" => limit.to_i}]

    results = Aggregate.target(target_id).collection.aggregate(pipeline)

    # Rename the "_id" key to "host" and adds the "percent" key
    total = results.inject(0) { |num, hash| num += hash["count"]; num }

    results.each do |hash|
      hash["host"] = hash["_id"]
      hash.delete("_id")
      hash["percent"] = ((hash["count"].to_f / total.to_f)*100).round(1)
    end

    results
  end

  def self.most_visited_places(target_id, params = {})
    match = {type: :position}
    match[:day] = {'$gte' => params['from'], '$lte' => params['to']} if params['from'] and params['to']
    limit = params['num'] || 5
    group = {_id: '$data.position', count: {"$sum" => "$count"}, radius: {"$min" => "$data.radius"}}
    project = {position: '$_id', _id: 0, count: 1, radius: 1}

    pipeline = [{"$match" => match}, {"$group" => group}, {"$sort" => {count: -1}}, {"$project" => project}, {"$limit" => limit.to_i}]
    results = Aggregate.target(target_id).collection.aggregate(pipeline)

    return results if results.empty?

    positions = results.map { |hash| hash['position'] }

    operation_id = ::Item.find(target_id).path.first

    positions2entities = Entity.path_include(operation_id).positions.in(position: positions).only(:position, :name).inject({}) do |hash, entity|
      hash[entity.position] = {'_id' => entity.id, 'name' => entity.name}
      hash
    end

    results.map! do |hash|
      entity = positions2entities[hash['position']]
      hash['entity'] = entity if entity
      hash
    end
  end

  def self.most_contacted(target_id, params)
    start = Time.now
    most_contacted_types = [:chat, :mail, :sms, :mms, :facebook,
                            :gmail, :skype, :bbm, :whatsapp, :msn, :adium,
                            :viber, :outlook, :wechat, :line, :phone]

    # mongoDB aggregation framework

    match = {:type => {'$in' => most_contacted_types}}
    match[:day] = {'$gte' => params['from'], '$lte' => params['to']} if params['from'] and params['to']

    pipeline = [{ "$match" => match },
                { "$group" =>
                  { _id: { peer: "$data.peer", type: "$type" },
                    count: { "$sum" => "$count" },
                    size: { "$sum" => "$size" },
                  }
                }]

    time = Time.now
    # extract the results
    contacted = Aggregate.target(target_id).collection.aggregate(pipeline)

    trace :debug, "Most contacted: Aggregation time #{Time.now - time}" if RCS::DB::Config.instance.global['PERF']

    # normalize them in a better form
    contacted.collect! {|e| {peer: e['_id']['peer'], type: e['_id']['type'], count: e['count'], size: e['size']}}

    # group them by type
    group = contacted.to_set.classify {|e| e[:type]}.values

    # sort can be 'count' or 'size'
    sort_by = params['sort'].to_sym if params['sort']
    sort_by ||= :count

    limit = params['num'].to_i - 1 if params['num']
    limit ||= 4

    # sort the most contacted and cut the first N (also calculate the percentage)
    top = group.collect do |set|
      total = set.inject(0) {|sum, e| sum + e[sort_by]}
      next if total == 0
      set.each {|e| e[:percent] = (e[sort_by] * 100 / total).round(1)}
      set.sort {|x,y| x[sort_by] <=> y[sort_by]}.reverse.slice(0..limit)
    end

    time = Time.now

    # resolve the names of the peer from the db of entities
    top.each do |t|
      t.each do |e|
        e[:peer_name] = Entity.name_from_handle(e[:type], e[:peer], target_id)
        e.delete(:peer_name) unless e[:peer_name]
      end
    end

    trace :debug, "Most contacted: Resolv time #{Time.now - time}" if RCS::DB::Config.instance.global['PERF']

    return top
  end
end