hackedteam/rcs-db

View on GitHub
lib/rcs-aggregator/position.rb

Summary

Maintainability
A
3 hrs
Test Coverage
require_release 'rcs-db/position/resolver'

module RCS
module Aggregator

# Handle position aggregations
class PositionAggregator
  extend RCS::Tracer

  def self.minimum_time_in_a_position
    RCS::DB::Config.instance.global['POSITION_TIME']
  end

  def self.extract(target_id, ev)

    # check if the position is good
    return [] if ev.data['latitude'].nil? or ev.data['longitude'].nil?

    positioner_agg = Aggregate.target(target_id).find_or_create_by(type: :positioner, day: '0', aid: '0')

    min_time = minimum_time_in_a_position

    result = nil

    if positioner_agg.data[ev.aid.to_s] and positioner_agg.data[ev.aid.to_s]['last'] > ev.da
      agent = Item.agents.find(ev.aid)
      trace :error, "Position evidence not ordered #{ev.data['type']} [#{positioner_agg.data[ev.aid.to_s]['last']}, #{ev.da}], skipping evidence for agent #{agent.name}"
      return []
    end

    # load the positioner from the db, if already saved, otherwise create a new one
    if positioner_agg.data[ev.aid.to_s]
      begin
        #trace :debug, "Reloading positioner from saved status (#{ev.aid.to_s})"
        positioner = RCS::DB::Positioner.new_from_dump(positioner_agg.data[ev.aid.to_s]['positioner'])
      rescue Exception => e
        trace :warn, "Cannot restore positioner status, creating a new one..."
        positioner = RCS::DB::Positioner.new(time: min_time)
      end

      # check the day of the last position processed
      last_position_day = Time.at(positioner_agg.data[ev.aid.to_s]['last']).getutc.strftime('%Y%m%d')
      current_position_day = Time.at(ev.da).getutc.strftime('%Y%m%d')

      # if we detect that the day has changed, force the positioner to emit the point in the previous day
      if last_position_day != current_position_day
        positioner.emit_and_reset do |stay|
          result = stay
          trace :info, "Positioner has detected a change in the day, forcing output of a stay point: #{stay.to_s}"
        end
      end

    else
      trace :debug, "Creating a new positioner for #{ev.aid.to_s}"
      positioner = RCS::DB::Positioner.new(time: min_time)
    end

    trace :debug, "lat: #{ev.data['latitude'].to_f}, lon: #{ev.data['longitude'].to_f}, r: #{ev.data['accuracy'].to_i}"

    # create a point from the evidence
    point = Point.new(lat: ev.data['latitude'].to_f, lon: ev.data['longitude'].to_f, r: ev.data['accuracy'].to_i, time: ev.da)

    # feed the positioner with the point and take the result (if any)
    positioner.feed(point) do |stay|
      result = stay
      trace :info, "Positioner has detected a stay point: #{stay.to_s}"
    end

    # save the positioner status into the aggregate
    positioner_agg.data[ev.aid] = {positioner: positioner.dump, last: ev.da}
    positioner_agg.save

    # empty if not emitted
    return [] unless result

    # return the stay point
    return [{type: :position,
             time: result.end,
             point: {latitude: result.lat, longitude: result.lon, radius: result.r},
             timeframe: {start: result.start, end: result.end}}]
  end

  def self.find_similar_or_create_by(target_id, params)

    position = params[:data][:position]

    # extract the radius and don't save points that are too precise, enlarge it to the min similarity radius
    params[:data][:radius] = params[:data][:position][:radius]
    params[:data][:radius] = Point::MINIMUM_SIMILAR_RADIUS if params[:data][:radius] < Point::MINIMUM_SIMILAR_RADIUS

    # the idea here is:
    # search in the db for point near the current one
    # then check for similarity, if one is found, return the old one
    past = Aggregate.target(target_id).positions_within(position).to_a

    # sort the result by day in reverse order, so we get the most recent first
    past.sort_by! {|x| x.day}.reverse!

    # search if we have the same point in the past (starting from today)
    # return if found
    past.each do |agg|
      # convert aggregate to point
      old = agg.to_point
      new = Point.new(lat: position[:latitude], lon: position[:longitude], r: params[:data][:radius])

      # if similar, return the old point
      if old.similar_to? new
        if agg.day.eql? params[:day]
          return agg
        else
          # if the day is different, create a new one on current day, but same old position
          params[:data] = agg[:data]
          return Aggregate.target(target_id).create!(params)
        end
      end
    end

    # fetch the timezone
    begin
      params[:data][:timezone] = RCS::DB::PositionResolver.get_google_timezone(params[:data][:position].stringify_keys)
    rescue Exception => ex
      trace :error, "Unable to fetch timezone of new position aggregate: #{ex.message}"
    end

    # no previous match create a new one
    params[:data][:position] = [params[:data][:position][:longitude], params[:data][:position][:latitude]]

    Aggregate.target(target_id).create!(params)
  end

end

end
end