HabitatMap/AirCasting

View on GitHub
app/services/save_measurements.rb

Summary

Maintainability
C
1 day
Test Coverage
class SaveMeasurements
  def initialize(user:)
    @user = user
  end

  def call(streams:)
    FixedSession.transaction { save(streams) }
  end

  private

  attr_reader :user

  def save(streams)
    persisted_streams =
      Stream
        .select(:id, :min_latitude, :min_longitude, :sensor_name, :session_id)
        .joins(:session)
        .where(session: { user_id: user.id })
        .load

    persisted_streams_hash =
      persisted_streams.each_with_object({}) do |stream, acc|
        acc[[stream.min_latitude.to_f, stream.min_longitude.to_f, stream.sensor_name]] = [
          stream.session_id,
          stream.id,
        ]
      end

    pairs_to_append, pairs_to_create =
      streams.to_a.partition do |stream, _measurements|
        rounded_latitude = stream.latitude.round(3)
        rounded_longitude = stream.longitude.round(3)
        stream_sensor_name = stream.sensor_name

        match_found = persisted_streams_hash.detect do |key, _|
          persisted_latitude, persisted_longitude, persisted_sensor_name = key
          match_condition = persisted_sensor_name == stream_sensor_name &&
                            persisted_latitude.round(3) == rounded_latitude &&
                            persisted_longitude.round(3) == rounded_longitude
          if match_condition
            stream.latitude = persisted_latitude
            stream.longitude = persisted_longitude
          end
          match_condition
        end

        match_found.present?
      end

    sessions_to_create =
      pairs_to_create.map do |stream, measurements|
        uuid = SecureRandom.uuid
        first = measurements.first
        last = measurements.last

        FixedSession.new(
          user_id: user.id,
          title: first.title,
          contribute: true,
          start_time_local: first.time_local,
          end_time_local: last.time_local,
          last_measurement_at: last.time_local,
          is_indoor: false,
          latitude: stream.latitude,
          longitude: stream.longitude,
          uuid: uuid,
          url_token: uuid,
          time_zone: stream.time_zone,
        )
      end
    import = FixedSession.import sessions_to_create
    if import.failed_instances.any?
      Rails
        .logger.warn "FixedSession.import failed for: #{import.failed_instances.inspect}"
    end
    last_id = FixedSession.last.id

    # https://github.com/zdennis/activerecord-import/issues/422
    session_ids = ((last_id - sessions_to_create.size + 1)..last_id).to_a

    sessions_to_update =
      pairs_to_append.each_with_object({}) do |(stream, measurements), acc|
        session_id =
          persisted_streams_hash[
            [stream.latitude, stream.longitude, stream.sensor_name]
          ].first
        last = measurements.first
        acc[session_id] = {
          'id' => session_id,
          'end_time_local' => last.time_local,
          'last_measurement_at' => last.time_local,
          'title' => last.title,
        }
      end
    to_import =
      FixedSession
        .where(id: sessions_to_update.keys)
        .map do |session|
          session
            .attributes
            .reject { |k, v| k == 'tag_list' }
            .merge(sessions_to_update.fetch(session.id))
        end
    import =
      FixedSession.import to_import,
                          on_duplicate_key_update: %i[
                            end_time_local
                            last_measurement_at
                            title
                          ]
    if import.failed_instances.any?
      Rails
        .logger.warn "FixedSession.import failed for: #{import.failed_instances.inspect}"
    end

    streams_to_update =
      pairs_to_append.each_with_object({}) do |(stream, measurements), acc|
        stream_id =
          persisted_streams_hash[
            [stream.latitude, stream.longitude, stream.sensor_name]
          ].last
        last = measurements.first
        acc[stream_id] = {
          'id' => stream_id,
          'average_value' => last.value,
          'measurements_count' => measurements.size,
        }
      end
    to_import =
      Stream
        .where(id: streams_to_update.keys)
        .map do |stream|
          to_merge = streams_to_update.fetch(stream.id)
          persisted = stream.attributes
          persisted
            .merge(to_merge)
            .merge(
              'measurements_count' =>
                to_merge.fetch('measurements_count') +
                  persisted.fetch('measurements_count'),
            )
        end
    import =
      Stream.import to_import,
                    on_duplicate_key_update: %i[
                      average_value
                      measurements_count
                    ]
    if import.failed_instances.any?
      Rails
        .logger.warn "Stream.import failed for: #{import.failed_instances.inspect}"
    end

    created_sessions = Session.where(id: session_ids)

    new_streams =
      pairs_to_create.map.with_index do |(stream, measurements), i|
        Stream.new(
          sensor_name: stream.sensor_name,
          unit_name: stream.unit_name,
          measurement_type: stream.measurement_type,
          measurement_short_type: stream.measurement_short_type,
          unit_symbol: stream.unit_symbol,
          threshold_set_id: stream.threshold_set_id,
          sensor_package_name: stream.sensor_package_name,
          min_latitude: stream.latitude,
          max_latitude: stream.latitude,
          min_longitude: stream.longitude,
          max_longitude: stream.longitude,
          start_latitude: stream.latitude,
          start_longitude: stream.longitude,
          session_id: session_ids[i],
          measurements_count: measurements.size,
          average_value: measurements.last.value,
        )
      end
    import = Stream.import new_streams
    if import.failed_instances.any?
      Rails
        .logger.warn "Stream.import failed for: #{import.failed_instances.inspect}"
    end
    last_id = Stream.last.id

    # https://github.com/zdennis/activerecord-import/issues/422
    stream_ids = ((last_id - new_streams.size + 1)..last_id).to_a
    factory = RGeo::Geographic.spherical_factory(srid: 4326)

    measurements =
      pairs_to_create
        .each_with_object([])
        .with_index do |((stream, measurements), acc), i|

          measurements.each do |measurement|
            acc <<
              Measurement.new(
                value: measurement.value,
                latitude: measurement.latitude,
                longitude: measurement.longitude,
                location: factory.point(measurement.longitude.to_f, measurement.latitude.to_f),
                time: measurement.time_local,
                timezone_offset: nil,
                milliseconds: 0,
                measured_value: measurement.value,
                stream_id: stream_ids[i],
                time_with_time_zone: measurement.time_with_time_zone
              )
          end
        end
    import = Measurement.import measurements
    if import.failed_instances.any?
      Rails
        .logger.warn "Measurement.import failed for: #{import.failed_instances.inspect}"
    end

    factory = RGeo::Geographic.spherical_factory(srid: 4326)

    measurements =
      pairs_to_append
        .each_with_object([])
        .with_index do |((stream, measurements), acc), i|
          measurements.each do |measurement|
            acc <<
              Measurement.new(
                value: measurement.value,
                latitude: measurement.latitude,
                longitude: measurement.longitude,
                location: factory.point(measurement.longitude.to_f, measurement.latitude.to_f),
                time: measurement.time_local,
                timezone_offset: nil,
                milliseconds: 0,
                measured_value: measurement.value,
                stream_id:
                  persisted_streams_hash[
                    [stream.latitude, stream.longitude, stream.sensor_name]
                  ].last,
                time_with_time_zone: measurement.time_with_time_zone
              )
          end
        end
    import = Measurement.import measurements
    if import.failed_instances.any?
      Rails
        .logger.warn "Measurement.import failed for: #{import.failed_instances.inspect}"
    end
  end
end