opf/openproject

View on GitHub
app/services/journals/create_service.rb

Summary

Maintainability
A
0 mins
Test Coverage
#-- copyright
# OpenProject is an open source project management software.
# Copyright (C) 2012-2024 the OpenProject GmbH
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License version 3.
#
# OpenProject is a fork of ChiliProject, which is a fork of Redmine. The copyright follows:
# Copyright (C) 2006-2013 Jean-Philippe Lang
# Copyright (C) 2010-2013 the ChiliProject Team
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
#
# See COPYRIGHT and LICENSE files for more details.
#++

# Will create journals for a journable (e.g. WorkPackage and Meeting)
# As a journal is basically a copy of the current state of the database, consisting of the journable as well as its
# custom values and attachments, those entries are copied in the database.
# Copying and thereby creation only takes place if a change of the current state and the last journal is identified.
# Note, that the adequate creation of a journal which represents the state that is generated by a single user action depends on
# no other user/action altering the current state at the same time especially in a multi process/thread setup.
# Therefore, the whole update of a journable needs to be safeguarded by a mutex. In our implementation, we use
#
# OpenProject::Mutex.with_advisory_lock_transaction(journable)
#
# for this purpose.

# rubocop:disable Rails/SquishedSQLHeredocs
module Journals
  class CreateService
    attr_accessor :journable, :user

    def initialize(journable, user)
      self.user = user
      self.journable = journable
    end

    def call(notes: "", cause: CauseOfChange::NoCause.new)
      Journal.transaction do
        journal = create_journal(notes, cause)

        if journal
          reload_journals
        end

        ServiceResult.success result: journal
      end
    end

    private

    # If the journalizing happens within the configured aggregation time, is carried out by the same user, has an
    # identical cause and only the predecessor or the journal to be created has notes, the changes are aggregated.
    # Instead of removing the predecessor, return it here so that it can be stripped in the journal creating
    # SQL to than be refilled. That way, references to the journal, including ones users have, are kept intact.
    def aggregatable_predecessor(notes, cause)
      predecessor = journable.last_journal

      if aggregatable?(predecessor, notes, cause)
        predecessor
      end
    end

    def create_journal(notes, cause)
      predecessor = aggregatable_predecessor(notes, cause)

      log_journal_creation(predecessor)

      create_sql = create_journal_sql(predecessor, notes, cause)

      # We need to ensure that the result is genuine. Otherwise,
      # calling the service repeatedly for the same journable
      # could e.g. return a (query cached) journal creation
      # that then e.g. leads to the later code thinking that a journal was
      # created.
      result = Journal.connection.uncached do
        ::Journal
          .connection
          .select_one(create_sql)
      end

      Journal.instantiate(result) if result
    end

    # The result of the whole SQL statement is a snapshot of the journable (e.g. a WorkPackage or a WikPage) at the point
    # the SQL statement was run:
    # * There will be either a newly created entry in the `journals` table or an updated entry in it if the former journal
    #   of the journable was aggregated (two consecutive updates within a configurable time frame by the same user).
    # * A new entry in the data table of the journable (e.g. work_package_journals for a WorkPackage) will be created
    #   containing a copy of the columns of the journable.
    # * New entries in the attachable_journals table, one for every attachment the journable has at the time.
    # * New entries in the customizable_journals table, one for every custom value the journable has at the time.
    # * New entries in the storages_file_links_journals table, one for file link value the journable has at the time.
    # * New entries in the meeting_agenda_item_journals table, one for agenda_item the journable has at the time.
    #
    # It consists of a couple of parts that are kept as individual queries (as CTEs) but
    # are all executed within a single database call.
    #
    # The first four CTEs('cleanup_predecessor_data', 'cleanup_predecessor_attachable', 'cleanup_predecessor_customizable'
    # and cleanup_predecessor_storable) strip the information of a predecessor if one exists. If no predecessor exists,
    # a noop SQL statement is employed instead.
    # To strip the information from the journal, the data record (e.g. from work_packages_journals) as well as the
    # attachment and custom value information is removed. The journal itself is kept and will later on have its
    # updated_at and possibly its notes property updated.
    #
    # The next CTEs (`max_journals`) responsibility is to fetch the latest journal and have that available for later queries
    # (i.e. when determining the latest state of the journable, when getting the current version number and when
    # comparing the timestamps of the last journalization time and the work package's updated_at time).
    #
    # The next CTE (`changes`) determines whether a change as occurred so that a new journal needs to be created. This check
    # is carried out the check if journalization needs to be carried out at all. To determine
    # whether a change is worthy of being journalized, the current and the latest journalized state are compared in four aspects:
    # * the journable's table columns are compared to the columns in the journable's journal data table
    # (e.g. work_package_journals for WorkPackages). Only columns that exist in the journable's journal data table are considered
    # (and some columns like the primary key `id` is ignored). Therefore, to add an attribute to be journalized, it needs to
    # be added to that table.
    # * the journable's attachments are compared to the attachable_journals entries being associated with the most recent journal.
    # * the journable's custom values are compared to the customizable_journals entries being associated with the most
    #   recent journal.
    # * the journable's file_links are compared to the storages_file_links_journals entries being associated with the most
    #   recent journal.
    # * the journable's meeting_agenda_items are compared to the meeting_agenda_item_journals entries being associated with the
    #   most recent journal.
    # When comparing text based values, newlines are normalized as otherwise users having a different OS might change a text value
    # without intending to.
    #
    # Journalization is then continued only if
    # * a change has been identified (by the `changes` CTE)
    # * OR a note is present
    # * OR a cause is present (which would be different from the cause of the predecessor as that one would otherwise be
    #   aggregated with)
    # * OR a predecessor is replaced
    #
    # To enforce consistent timestamps throughout the data structure of journable and journal, the time used for further timestamp
    # setting is then calculated once between the two CTEs `touch_journable` and `fetch_time`. The auxiliary tables
    # (attachable_journals, customizable_journals and storages_file_links_journals) don't have timestamps so they can be
    # disregarded.
    #
    # Most of the time, the time used will be the updated_at of the journable. This follows the logic that most of the time
    # the journable receives new attributes first which will subsequently trigger the run of this service. During the course
    # of the journable saving, the updated_at will receive a current timestamp by Rails. But if only a cause or a note is added,
    # or if a custom value, an attachable or a file_link is altered, the journable will not have been touched before and
    # therefore the time this SQL statement is run at will be used. The SQL will in this case touch the journable with that
    # timestamp itself (`touch_journable`).
    # Whether the journable was updated before or the SQL statement did it, the value of either will end up in the result
    # of the `fetch_time` CTE to be used in the later stages of the SQL.
    #
    # After the SQL is run, the timestamps of the journable, the predecessor journal and the newly created journal will have
    # interdependencies:
    # * If a new journal is created (i.e. no predecessor is aggregated):
    #   * The updated_at of the journable, the newly created journal's created_at, updated_at and the lower bound of its
    #     validity_period as well as the upper bound of the predecessor's validity_period will be the same (`fetch_time` value).
    #   * The upper bound of the newly created journal's validity_period will be NULL meaning that it does not end yet.
    # * If a predecessor is aggregated:
    #   * The updated_at of the journable and the aggregated journal's updated_at will be the same.
    #   * The created_at of the aggregated journal as well as its lower bound of the validity_period will be the same as
    #     before.
    # The timestamps are updated by `touch_journable` and `update_predecessor` respectively.
    #
    # In case of no aggregation, the preceding journal will now have values for both the upper as well as the lower bound
    # of its validity_range. Such a journal can then be considered closed.
    #
    # The `inserted_journal` will either update the updated_at value of the aggregated predecessor or, in the absence
    # of an aggregated predecessor create a new journal with the timestamps as described above. Both rely on the return
    # value of `insert_data'. That CTE, on the basis of what was identified in `changes` (and only if there are some)
    # writes the snapshot of the journables column into the correct data journal (e.g work_package_journals for a
    # WorkPackage). The return values of the `insert_data` is relevant also for the `inserted_data` CTE as the `data_id` field
    # needs to be inserted into the journal. This is also the reason why the `insert_data` CTE is run before
    # the `inserted_journal`.
    #
    # All cases (having a change, a note or a cause) can at this point be identified by a journal having been created
    # or replaced which are treated the same.
    # Therefore, the return value of the `inserted_journal` is further on used to identify whether the next statements
    # (`insert_attachable`, `insert_customizable` and `insert_storable`) should actually insert data. It is additionally
    # used as the values returned by the overall SQL statement so that an AR instance can be instantiated with it.
    #
    def create_journal_sql(predecessor, notes, cause)
      journal_modifications = journal_modification_sql(predecessor, notes, cause)
      relation_modifications = relation_modifications_sql(predecessor)

      journal_cte_clauses = [journal_modifications]
      journal_cte_clauses << relation_modifications if relation_modifications.any?

      <<~SQL
        WITH #{journal_cte_clauses.join(',')}
        SELECT * from inserted_journal
      SQL
    end

    def journal_modification_sql(predecessor, notes, cause)
      <<~SQL
        cleanup_predecessor_data AS (
          #{cleanup_predecessor_data(predecessor)}
        ), max_journals AS (
          #{select_max_journal_sql(predecessor)}
        ), changes AS (
          #{select_changed_sql}
        ), touch_journable AS (
          #{touch_journable_sql(predecessor, notes, cause)}
        ), fetch_time AS (
          #{fetch_time_sql}
        ), insert_data AS (
          #{insert_data_sql(predecessor, notes, cause)}
        ), update_predecessor AS (
          #{update_predecessor_sql(predecessor, notes, cause)}
        ), inserted_journal AS (
          #{update_or_insert_journal_sql(predecessor, notes, cause)}
        )
      SQL
    end

    def relation_modifications_sql(predecessor)
      relations = []

      for_supported_associations do |association|
        relations << association_modifications_sql(association, predecessor)
      end

      relations
    end

    def for_supported_associations
      associations = {
        attachable?: :attachable,
        customizable?: :customizable,
        file_links: :storable,
        agenda_items: :agenda_itemable
      }

      associations.each do |is_associated, association|
        if journable.respond_to?(is_associated)
          yield association
        end
      end
    end

    def association_modifications_sql(association, predecessor)
      <<~SQL
        cleanup_predecessor_#{association} AS (
          #{send(:"cleanup_predecessor_#{association}", predecessor)}
        ), insert_#{association} AS (
          #{send(:"insert_#{association}_sql")}
        )
      SQL
    end

    def cleanup_predecessor_data(predecessor)
      cleanup_predecessor(predecessor,
                          data_table_name,
                          :id,
                          :data_id)
    end

    def cleanup_predecessor_attachable(predecessor)
      cleanup_predecessor(predecessor,
                          "attachable_journals",
                          :journal_id,
                          :id)
    end

    def cleanup_predecessor_customizable(predecessor)
      cleanup_predecessor(predecessor,
                          "customizable_journals",
                          :journal_id,
                          :id)
    end

    def cleanup_predecessor_storable(predecessor)
      cleanup_predecessor(predecessor,
                          "storages_file_links_journals",
                          :journal_id,
                          :id)
    end

    def cleanup_predecessor_agenda_itemable(predecessor)
      cleanup_predecessor(predecessor,
                          "meeting_agenda_item_journals",
                          :journal_id,
                          :id)
    end

    def cleanup_predecessor(predecessor, table_name, column, referenced_id)
      return "SELECT 1" unless predecessor

      sql = <<~SQL
        DELETE
        FROM
         #{table_name}
        WHERE
         #{column} = :#{column}
      SQL

      sanitize sql,
               column => predecessor.send(referenced_id)
    end

    def update_or_insert_journal_sql(predecessor, notes, cause)
      if predecessor
        update_journal_sql(predecessor, notes, cause)
      else
        insert_journal_sql(notes, cause)
      end
    end

    def update_journal_sql(predecessor, notes, cause)
      # If there is a predecessor, we don't want to create a new one, we simply rewrite it.
      # The original data of that predecessor (data e.g. work_package_journals, customizable_journals, attachable_journals)
      # has been deleted before but the notes need to taken over and the timestamps updated as if the
      # journal would have been created.
      #
      # A lot of the data does not need to be set anew, since we only aggregate if that data stays the same
      # (e.g. the user_id).
      journal_sql = <<~SQL
        UPDATE
          journals
        SET
          notes = :notes,
          updated_at = (SELECT updated_at FROM fetch_time),
          data_id = insert_data.id,
          cause = :cause
        FROM insert_data
        WHERE journals.id = :predecessor_id
        RETURNING
          journals.*
      SQL

      sanitize(journal_sql,
               notes: notes.presence || predecessor.notes,
               predecessor_id: predecessor.id,
               cause: cause_sql(cause))
    end

    def insert_journal_sql(notes, cause)
      journal_sql = <<~SQL
        INSERT INTO
          journals (
            journable_id,
            journable_type,
            version,
            user_id,
            notes,
            created_at,
            updated_at,
            data_id,
            data_type,
            cause,
            validity_period
          )
        SELECT
          :journable_id,
          :journable_type,
          COALESCE(max_journals.version, 0) + 1,
          :user_id,
          :notes,
          (SELECT updated_at FROM fetch_time),
          (SELECT updated_at FROM fetch_time),
          insert_data.id,
          :data_type,
          :cause,
          tstzrange((SELECT updated_at FROM fetch_time), NULL)
        FROM max_journals, insert_data
        RETURNING *
      SQL

      sanitize(journal_sql,
               notes:,
               cause: cause_sql(cause),
               journable_id: journable.id,
               journable_type:,
               user_id: user.id,
               data_type: journable.class.journal_class.name)
    end

    def insert_data_sql(predecessor, notes, cause)
      data_sql = <<~SQL
        INSERT INTO
          #{data_table_name} (
            #{data_sink_columns}
          )
        SELECT
          #{data_source_columns}
        FROM #{journable_table_name}
        #{journable_data_sql_addition}
        WHERE
          #{journable_table_name}.id = :journable_id
          #{only_on_changed_or_forced_condition_sql(predecessor, notes, cause)}
        RETURNING *
      SQL

      sanitize(data_sql,
               journable_id: journable.id)
    end

    def journable_class_name
      journable.class.base_class.name
    end

    def insert_attachable_sql
      attachable_sql = <<~SQL
        INSERT INTO
          attachable_journals (
            journal_id,
            attachment_id,
            filename
          )
        SELECT
          #{id_from_inserted_journal_sql},
          attachments.id,
          attachments.file
        FROM attachments
        WHERE
          #{only_if_created_sql}
          AND attachments.container_id = :journable_id
          AND attachments.container_type = :journable_class_name
      SQL

      sanitize(attachable_sql,
               journable_id: journable.id,
               journable_class_name:)
    end

    def insert_customizable_sql
      customizable_sql = <<~SQL
        INSERT INTO
          customizable_journals (
            journal_id,
            custom_field_id,
            value
          )
        SELECT
          #{id_from_inserted_journal_sql},
          custom_values.custom_field_id,
          #{normalize_newlines_sql('custom_values.value')}
        FROM custom_values
        WHERE
          #{only_if_created_sql}
          AND custom_values.customized_id = :journable_id
          AND custom_values.customized_type = :journable_class_name
          AND custom_values.value IS NOT NULL
          AND custom_values.value != ''
      SQL

      sanitize(customizable_sql,
               journable_id: journable.id,
               journable_class_name:)
    end

    def insert_storable_sql
      storable_sql = <<~SQL
        INSERT INTO
          storages_file_links_journals (
            journal_id,
            file_link_id,
            link_name,
            storage_name
          )
        SELECT
          #{id_from_inserted_journal_sql},
          file_links.id,
          file_links.origin_name,
          storages.name
        FROM file_links left join storages ON file_links.storage_id = storages.id
        WHERE
          #{only_if_created_sql}
          AND file_links.container_id = :journable_id
          AND file_links.container_type = :journable_class_name
      SQL

      sanitize(storable_sql,
               journable_id: journable.id,
               journable_class_name:)
    end

    def insert_agenda_itemable_sql
      agenda_itemable_sql = <<~SQL
        INSERT INTO
          meeting_agenda_item_journals (
            journal_id,
            agenda_item_id,
            author_id,
            title,
            notes,
            position,
            duration_in_minutes,
            start_time,
            end_time,
            work_package_id,
            item_type
          )
        SELECT
          #{id_from_inserted_journal_sql},
          agenda_items.id,
          agenda_items.author_id,
          agenda_items.title,
          agenda_items.notes,
          agenda_items.position,
          agenda_items.duration_in_minutes,
          agenda_items.start_time,
          agenda_items.end_time,
          agenda_items.work_package_id,
          agenda_items.item_type
        FROM meeting_agenda_items agenda_items
        WHERE
          #{only_if_created_sql}
          AND agenda_items.meeting_id = :journable_id
      SQL

      sanitize(agenda_itemable_sql,
               journable_id: journable.id)
    end

    # Updates the updated_at timestamp of the journable.
    # That is only carried out if the journable doesn't already have a newer timestamp than the most recent journal or
    # hasn't been updated by the `#save` call after which this service runs.
    # Most recent in this case can mean one of two things:
    #  * if there is a predecessor we are aggregating with, it is that predecessor
    #  * otherwise, the most recent journal that we are not aggregating with.
    # Whenever an attribute is updated on the journable before creating the journal, the updated_at timestamp
    # will already have been increased so nothing needs to be done.
    # But if any of the associated data is updated or if only a cause or note is added, the journable would
    # otherwise not have receive an updated timestamp.
    def touch_journable_sql(predecessor, notes, cause)
      if journable.class.aaj_options[:timestamp].to_sym == :updated_at
        update_sql = <<~SQL
          UPDATE
            #{journable_table_name}
          SET
            updated_at = COALESCE(:update_timestamp, statement_timestamp())
          WHERE
            id = :id
            #{only_on_changed_or_forced_condition_sql(predecessor, notes, cause)}
            AND NOT updated_at > COALESCE(:predecessor_timestamp, (SELECT updated_at FROM max_journals))
          RETURNING updated_at
        SQL

        sanitize(update_sql,
                 update_timestamp: journable.updated_at_previously_changed? ? journable.updated_at : nil,
                 predecessor_timestamp: predecessor&.updated_at,
                 id: journable.id)
      else
        <<~SQL
          SELECT NULL::timestamp with time zone AS updated_at
        SQL
      end
    end

    # Fetches the timestamp to be used by all subsequent SQL statements e.g. for
    # * setting the created_at and updated_at timestamps of the newly created journal
    # * setting the updated_at timestamp on an updated (aggregated with) journal
    # * setting the validity_period (upper bound) of the preceding journal.
    def fetch_time_sql
      update_sql = <<~SQL
        SELECT COALESCE((SELECT updated_at FROM touch_journable), :journable_timestamp) AS updated_at
      SQL

      sanitize(update_sql,
               journable_timestamp:)
    end

    # Sets the validity_period's upper boundary of the preceding journal to the created_at timestamp of the inserted journal.
    # The upper bound set is not included.
    # If there is a predecessor (meaning we are aggregating/updating an existing journal), nothing is done since
    # in that case the preceding journal is the one we are currently aggregating with so it will still remain
    # the most recent one.
    def update_predecessor_sql(predecessor, notes, cause)
      return "SELECT 1" if predecessor.present?

      <<~SQL
        UPDATE
          journals
        SET
          validity_period = tstzrange(lower(validity_period), (SELECT updated_at FROM fetch_time), '[)')
        WHERE
          id = (SELECT id from max_journals)
          #{only_on_changed_or_forced_condition_sql(predecessor, notes, cause)}
      SQL
    end

    def select_max_journal_sql(predecessor)
      sql = <<~SQL
        SELECT
          :journable_id journable_id,
          :journable_type journable_type,
          updated_at,
          COALESCE(journals.version, fallback.version) AS version,
          COALESCE(journals.id, 0) id,
          COALESCE(journals.data_id, 0) data_id
        FROM
          journals
        RIGHT OUTER JOIN
          (SELECT 0 AS version) fallback
        ON
          journals.journable_id = :journable_id
          AND journals.journable_type = :journable_type
          AND journals.version IN (#{max_journal_sql(predecessor)})
      SQL

      sanitize(sql,
               journable_id: journable.id,
               journable_type:)
    end

    def select_changed_sql
      sql = <<~SQL
        SELECT
           *
        FROM
          (#{data_changes_sql}) data_changes
      SQL

      for_supported_associations do |association|
        sql += <<~SQL
          FULL JOIN
            (#{send(:"#{association}_changes_sql")}) #{association}_changes
          ON
            #{association}_changes.journable_id = data_changes.journable_id
        SQL
      end

      sql
    end

    def attachable_changes_sql
      attachable_changes_sql = <<~SQL
        SELECT
          max_journals.journable_id
        FROM
          max_journals
        LEFT OUTER JOIN
          attachable_journals
        ON
          attachable_journals.journal_id = max_journals.id
        FULL JOIN
          (SELECT *
           FROM attachments
           WHERE attachments.container_id = :journable_id AND attachments.container_type = :container_type) attachments
        ON
          attachments.id = attachable_journals.attachment_id
        WHERE
          (attachments.id IS NULL AND attachable_journals.attachment_id IS NOT NULL)
          OR (attachable_journals.attachment_id IS NULL AND attachments.id IS NOT NULL)
      SQL

      sanitize(attachable_changes_sql,
               journable_id: journable.id,
               container_type: journable_class_name)
    end

    def customizable_changes_sql
      customizable_changes_sql = <<~SQL
        SELECT
          max_journals.journable_id
        FROM
          max_journals
        LEFT OUTER JOIN
          customizable_journals
        ON
          customizable_journals.journal_id = max_journals.id
        FULL JOIN
          (SELECT *
           FROM custom_values
           WHERE custom_values.customized_id = :journable_id AND custom_values.customized_type = :customized_type) custom_values
        ON
          custom_values.custom_field_id = customizable_journals.custom_field_id
        WHERE
          (custom_values.value IS NULL AND customizable_journals.value IS NOT NULL)
          OR (customizable_journals.value IS NULL AND custom_values.value IS NOT NULL AND custom_values.value != '')
          OR (#{normalize_newlines_sql('customizable_journals.value')} !=
              #{normalize_newlines_sql('custom_values.value')})
      SQL

      sanitize(customizable_changes_sql,
               customized_type: journable_class_name,
               journable_id: journable.id)
    end

    def storable_changes_sql
      storables_changes_sql = <<~SQL
        SELECT
          max_journals.journable_id
        FROM
          max_journals
        LEFT OUTER JOIN
          storages_file_links_journals
        ON
          storages_file_links_journals.journal_id = max_journals.id
        FULL JOIN
          (SELECT *
           FROM file_links
           WHERE file_links.container_id = :journable_id AND file_links.container_type = :container_type) file_links
        ON
          file_links.id = storages_file_links_journals.file_link_id
        WHERE
          (file_links.id IS NULL AND storages_file_links_journals.file_link_id IS NOT NULL)
          OR (storages_file_links_journals.file_link_id IS NULL AND file_links.id IS NOT NULL)
      SQL

      sanitize(storables_changes_sql,
               journable_id: journable.id,
               container_type: journable_class_name)
    end

    def agenda_itemable_changes_sql
      agenda_itemable_changes_sql = <<~SQL
        SELECT
          max_journals.journable_id
        FROM
          max_journals
        LEFT OUTER JOIN
          meeting_agenda_item_journals
        ON
          meeting_agenda_item_journals.journal_id = max_journals.id
        FULL JOIN
          (SELECT *
           FROM meeting_agenda_items
           WHERE meeting_agenda_items.meeting_id = :journable_id) agenda_items
        ON
          agenda_items.id = meeting_agenda_item_journals.agenda_item_id
        WHERE
          (agenda_items.id IS DISTINCT FROM meeting_agenda_item_journals.agenda_item_id)
          OR (agenda_items.title IS DISTINCT FROM meeting_agenda_item_journals.title)
          OR (#{normalize_newlines_sql('agenda_items.notes')} IS DISTINCT FROM
              #{normalize_newlines_sql('meeting_agenda_item_journals.notes')})
          OR (agenda_items.position IS DISTINCT FROM meeting_agenda_item_journals.position)
          OR (agenda_items.duration_in_minutes IS DISTINCT FROM meeting_agenda_item_journals.duration_in_minutes)
          OR (agenda_items.start_time IS DISTINCT FROM meeting_agenda_item_journals.start_time)
          OR (agenda_items.end_time IS DISTINCT FROM meeting_agenda_item_journals.end_time)
          OR (agenda_items.work_package_id IS DISTINCT FROM meeting_agenda_item_journals.work_package_id)
          OR (agenda_items.item_type IS DISTINCT FROM meeting_agenda_item_journals.item_type)
      SQL

      sanitize(agenda_itemable_changes_sql,
               journable_id: journable.id)
    end

    def data_changes_sql
      data_changes_sql = <<~SQL
        SELECT
          #{journable_table_name}.id journable_id
        FROM
          (SELECT * FROM #{journable_table_name} #{journable_data_sql_addition}) #{journable_table_name}
        LEFT JOIN
          (SELECT * FROM max_journals
           JOIN
             #{data_table_name}
           ON
             #{data_table_name}.id = max_journals.data_id) #{data_table_name}
        ON
          #{journable_table_name}.id = #{data_table_name}.journable_id
        WHERE
          #{journable_table_name}.id = :journable_id AND (#{data_changes_condition_sql})
      SQL

      sanitize(data_changes_sql,
               journable_id: journable.id)
    end

    def max_journal_sql(predecessor)
      sql = <<~SQL
        SELECT MAX(version)
        FROM journals
        WHERE journable_id = :journable_id
        AND journable_type = :journable_type
      SQL

      if predecessor
        sanitize "#{sql} AND version < :predecessor_version",
                 journable_id: journable.id,
                 journable_type:,
                 predecessor_version: predecessor.version
      else
        sanitize sql,
                 journable_id: journable.id,
                 journable_type:
      end
    end

    def only_if_created_sql
      "EXISTS (SELECT * from inserted_journal)"
    end

    def id_from_inserted_journal_sql
      "(SELECT id FROM inserted_journal)"
    end

    def data_changes_condition_sql
      data_table = data_table_name
      journable_table = journable_table_name

      data_changes = (journable.journaled_columns_names - text_column_names).map do |column_name|
        <<~SQL
          (#{journable_table}.#{column_name} IS DISTINCT FROM #{data_table}.#{column_name})
        SQL
      end

      data_changes += text_column_names.map do |column_name|
        <<~SQL
          #{normalize_newlines_sql("#{journable_table}.#{column_name}")} !=
           #{normalize_newlines_sql("#{data_table}.#{column_name}")}
        SQL
      end

      data_changes.join(" OR ")
    end

    def only_on_changed_or_forced_condition_sql(predecessor, notes, cause)
      # The predecessor part of the condition is in in case the predecessor is being aggregated.
      # In one of the cases, the change that is being aggregated in nullifies the changes done by the predecessor so
      # that in effect, there would be no changes any more (compared to the predecessor's predecessor).
      # With changes being a precondition for journalizing, no journal data would be created and the predecessor
      # that is aggregated ends up having no data.
      if notes.blank? && cause.blank? && predecessor.nil?
        "AND EXISTS (SELECT * FROM changes)"
      else
        ""
      end
    end

    def data_sink_columns
      text_columns = text_column_names
      (journable.journaled_columns_names - text_columns + text_columns).join(", ")
    end

    def data_source_columns
      text_columns = text_column_names
      normalized_text_columns = text_columns.map { |column| normalize_newlines_sql(column) }
      (journable.journaled_columns_names - text_columns + normalized_text_columns).join(", ")
    end

    def journable_data_sql_addition
      journable.class.aaj_options[:data_sql]&.call(journable) || ""
    end

    def text_column_names
      journable.class.columns_hash.select { |_, v| v.type == :text }.keys.map(&:to_sym) & journable.journaled_columns_names
    end

    def journable_timestamp
      journable.send(journable.class.aaj_options[:timestamp])
    end

    def journable_type
      journable.class.base_class.name
    end

    def journable_table_name
      journable.class.table_name
    end

    def data_table_name
      journable.class.journal_class.table_name
    end

    def normalize_newlines_sql(column)
      "REGEXP_REPLACE(COALESCE(#{column},''), '\\r\\n', '\n', 'g')"
    end

    def cause_sql(cause)
      # Using the same encoder mechanism that ActiveRecord uses for json/jsonb columns
      ActiveSupport::JSON.encode(cause || {})
    end

    # Because we added the journal via bare metal sql, rails does not yet
    # know of the journal. If the journable has the journals loaded already,
    # the caller might expect the journals to also be updated so we do it for him.
    def reload_journals
      journable.journals.reload if journable.journals.loaded?
    end

    def aggregatable?(predecessor, notes, cause)
      predecessor.present? &&
        aggregation_active? &&
        within_aggregation_time?(predecessor) &&
        same_user?(predecessor) &&
        same_cause?(predecessor, cause) &&
        only_one_note(predecessor, notes)
    end

    def aggregation_active?
      Setting.journal_aggregation_time_minutes.to_i > 0
    end

    def within_aggregation_time?(predecessor)
      predecessor.updated_at >= (Time.zone.now - Setting.journal_aggregation_time_minutes.to_i.minutes)
    end

    def only_one_note(predecessor, notes)
      predecessor.notes.empty? || notes.empty?
    end

    def same_user?(predecessor)
      predecessor.user_id == user.id
    end

    def same_cause?(predecessor, cause)
      (predecessor.cause.blank? && cause.blank?) || predecessor.cause == cause
    end

    def log_journal_creation(predecessor)
      if predecessor
        Rails.logger.debug { "Aggregating journal #{predecessor.id} for #{journable_type} ##{journable.id}" }
      else
        Rails.logger.debug { "Inserting new journal for #{journable_type} ##{journable.id}" }
      end
    end

    delegate :sanitize,
             to: ::OpenProject::SqlSanitization
  end
end
# rubocop:enable Rails/SquishedSQLHeredocs