lib/sequent/migrations/view_schema.rb
# frozen_string_literal: true
require 'parallel'
require 'postgresql_cursor'
require_relative 'errors'
require_relative '../support/database'
require_relative '../sequent'
require_relative '../util/timer'
require_relative '../util/printer'
require_relative './projectors'
require_relative 'planner'
require_relative 'executor'
require_relative 'sql'
require_relative 'versions'
module Sequent
module Migrations
##
# ViewSchema is used for migration of you view_schema. For instance
# when you create new Projectors or change existing Projectors.
#
# The following migrations are supported:
#
# - ReplayTable (Projector migrations)
# - AlterTable (For instance if you introduce a new column)
#
# To maintain your migrations you need to:
# 1. Create a class that extends `Sequent::Migrations::Projectors`
# and specify in `Sequent.configuration.migrations_class_name`
# 2. Define per version which migrations you want to execute
# See the definition of `Sequent::Migrations::Projectors.versions` and `Sequent::Migrations::Projectors.version`
# 3. Specify in Sequent where your sql files reside (Sequent.configuration.migration_sql_files_directory)
# 4. Ensure that you add %SUFFIX% to each name that needs to be unique in postgres
# (like TABLE names, INDEX names, PRIMARY KEYS)
# E.g. `create table foo%SUFFIX% (id serial NOT NULL, CONSTRAINT foo_pkey%SUFFIX% PRIMARY KEY (id))`
# 5. If you want to run an `alter_table` migration ensure that
# a sql file named `table_name_VERSION.sql` exists.
#
# Example:
#
# class AppMigrations < Sequent::Migrations::Projectors
# def self.version
# '3'
# end
#
# def self.versions
# {
# '1' => [Sequent.all_projectors],
# '2' => [
# UserProjector,
# InvoiceProjector,
# ],
# '3' => [
# Sequent::Migrations.alter_table(UserRecord)
# ]
#
# }
# end
#
# end
class ViewSchema
# Corresponds with the index on aggregate_id column in the event_records table
#
# Since we replay in batches of the first 3 chars of the uuid we created an index on
# these 3 characters. Hence the name ;-)
#
# This also means that the online replay is divided up into 16**3 groups
# This might seem a lot for starting event store, but when you will get more
# events, you will see that this is pretty good partitioned.
LENGTH_OF_SUBSTRING_INDEX_ON_AGGREGATE_ID_IN_EVENT_STORE = 3
include Sequent::Util::Timer
include Sequent::Util::Printer
include Sql
attr_reader :view_schema, :db_config, :logger
class << self
# @see #create_view_tables
# @param env [String] The environment used for connecting the database
def create_view_tables(env:)
fail ArgumentError, 'env is required' if env.blank?
db_config = Sequent::Support::Database.read_config(env)
Sequent::Support::Database.establish_connection(db_config)
new(db_config: db_config).create_view_tables
end
# @see #create_view_schema_if_not_exists
# @param env [String] The environment used for connecting the database
def create_view_schema_if_not_exists(env:)
fail ArgumentError, 'env is required' if env.blank?
db_config = Sequent::Support::Database.read_config(env)
Sequent::Support::Database.establish_connection(db_config)
new(db_config: db_config).create_view_schema_if_not_exists
end
end
def initialize(db_config:)
@db_config = db_config
@view_schema = Sequent.configuration.view_schema_name
@logger = Sequent.logger
end
##
# Returns the current version from the database
def current_version
Versions.current_version
end
##
# Utility method that creates all tables in the view schema
#
# This method is mainly useful in test scenario to just create
# the entire view schema without replaying the events
def create_view_tables
create_view_schema_if_not_exists
return if Sequent.migration_class == Sequent::Migrations::Projectors
return if Sequent.new_version == current_version
in_view_schema do
Sequent::Core::Migratable.all.flat_map(&:managed_tables).each do |table|
sql_file = "#{Sequent.configuration.migration_sql_files_directory}/#{table.table_name}.sql"
statements = sql_file_to_statements(sql_file) do |raw_sql|
raw_sql.remove('%SUFFIX%')
end
statements.each { |statement| exec_sql(statement) }
indexes_file_name = "#{Sequent.configuration.migration_sql_files_directory}/#{table.table_name}.indexes.sql"
if File.exist?(indexes_file_name)
statements = sql_file_to_statements(indexes_file_name) { |raw_sql| raw_sql.remove('%SUFFIX%') }
statements.each(&method(:exec_sql))
end
end
Versions.create!(version: Sequent.new_version)
end
end
##
# Utility method that replays events for all managed_tables from all Sequent::Core::Projector's
#
# This method is mainly useful in test scenario's or development tasks
def replay_all!(group_exponent: 1)
replay!(
Sequent.configuration.online_replay_persistor_class.new,
projectors: Core::Migratable.projectors,
groups: groups(group_exponent: group_exponent),
)
end
##
# Utility method that creates the view_schema and the meta data tables
#
# This method is mainly useful during an initial setup of the view schema
def create_view_schema_if_not_exists
exec_sql(%(CREATE SCHEMA IF NOT EXISTS #{view_schema}))
migrate_metadata_tables
end
def plan
@plan ||= Planner.new(Sequent.migration_class.versions).plan(current_version, Sequent.new_version)
end
def executor
@executor ||= Executor.new
end
##
# First part of a view schema migration
#
# Call this method while your application is running.
# The online part consists of:
#
# 1. Ensure any previous migrations are cleaned up
# 2. Create new tables for the Projectors which need to be migrated to the new version
# These tables will be called `table_name_VERSION`.
# 3. Replay all events to populate the tables
# It keeps track of all events that are already replayed.
# 4. Resets the table names of the activerecord models (projections)
# back to their original values (so without the VERSION suffix)
#
# If anything fails an exception is raised and everything is rolled back
#
# @raise ConcurrentMigrationError if migration is already running
def migrate_online
ensure_valid_plan!
migrate_metadata_tables
return if Sequent.new_version == current_version
ensure_version_correct!
Sequent.logger.info("Start migrate_online for version #{Sequent.new_version}")
in_view_schema do
Versions.start_online!(Sequent.new_version)
drop_old_tables(Sequent.new_version)
executor.execute_online(plan)
end
if plan.projectors.any?
replay!(
Sequent.configuration.online_replay_persistor_class.new,
groups: groups,
maximum_xact_id_exclusive: Versions.running.first.xmin_xact_id,
)
end
in_view_schema do
executor.create_indexes_after_execute_online(plan)
executor.reset_table_names(plan)
Versions.end_online!(Sequent.new_version)
end
Sequent.logger.info("Done migrate_online for version #{Sequent.new_version}")
rescue ConcurrentMigration
# Do not rollback the migration when this is a concurrent migration as the other one is running
raise
rescue InvalidMigrationDefinition
# Do not rollback the migration when since there is nothing to rollback
raise
rescue Exception => e # rubocop:disable Lint/RescueException
rollback_migration
raise e
end
##
# Last part of a view schema migration
#
# +You have to ensure no events are being added to the event store while this method is running.+
# For instance put your application in maintenance mode.
#
# The offline part consists of:
#
# 1. Replay all events not yet replayed since #migration_online
# 2. Within a single transaction do:
# 2.1 Rename current tables with the +current version+ as SUFFIX
# 2.2 Rename the new tables and remove the +new version+ suffix
# 2.3 Add the new version in the +Versions+ table
# 3. Update the versions table to complete the migration
#
# If anything fails an exception is raised and everything is rolled back
#
# When this method succeeds you can safely start the application from Sequent's point of view.
#
def migrate_offline
return if Sequent.new_version == current_version
ensure_version_correct!
in_view_schema { Versions.start_offline!(Sequent.new_version) }
Sequent.logger.info("Start migrate_offline for version #{Sequent.new_version}")
executor.set_table_names_to_new_version(plan)
# 1 replay events not yet replayed
if plan.projectors.any?
replay!(
Sequent.configuration.offline_replay_persistor_class.new,
groups: groups(group_exponent: 1),
minimum_xact_id_inclusive: Versions.running.first.xmin_xact_id,
)
end
in_view_schema do
Sequent::ApplicationRecord.transaction do
# 2.1, 2.2
executor.execute_offline(plan, current_version)
# 2.3 Create migration record
Versions.end_offline!(Sequent.new_version)
end
end
logger.info "Migrated to version #{Sequent.new_version}"
rescue ConcurrentMigration
raise
rescue MigrationDone
# no-op same as Sequent.new_version == current_version
rescue Exception => e # rubocop:disable Lint/RescueException
rollback_migration
raise e
end
private
def ensure_valid_plan!
plan
end
def migrate_metadata_tables
Sequent::ApplicationRecord.transaction do
in_view_schema do
exec_sql([Versions.migration_sql].join("\n"))
end
end
end
def ensure_version_correct!
create_view_schema_if_not_exists
new_version = Sequent.new_version
if new_version < current_version
fail ArgumentError,
"new_version [#{new_version}] must be greater or equal to current_version [#{current_version}]"
end
end
def replay!(
replay_persistor,
groups:,
projectors: plan.projectors,
minimum_xact_id_inclusive: nil,
maximum_xact_id_exclusive: nil
)
logger.info "groups: #{groups.size}"
with_sequent_config(replay_persistor, projectors) do
logger.info 'Start replaying events'
time("#{groups.size} groups replayed") do
event_types = projectors.flat_map { |projector| projector.message_mapping.keys }.uniq.map(&:name)
disconnect!
@connected = false
# using `map_with_index` because https://github.com/grosser/parallel/issues/175
result = Parallel.map_with_index(
groups,
in_processes: Sequent.configuration.number_of_replay_processes,
) do |aggregate_prefixes, index|
@connected ||= establish_connection
msg = <<~EOS.chomp
Group (#{aggregate_prefixes.first}-#{aggregate_prefixes.last}) #{index + 1}/#{groups.size} replayed
EOS
time(msg) do
replay_events(
aggregate_prefixes,
event_types,
minimum_xact_id_inclusive,
maximum_xact_id_exclusive,
replay_persistor,
&on_progress
)
end
nil
rescue StandardError => e
logger.error "Replaying failed for ids: ^#{aggregate_prefixes.first} - #{aggregate_prefixes.last}"
logger.error '+++++++++++++++ ERROR +++++++++++++++'
recursively_print(e)
raise Parallel::Kill # immediately kill all sub-processes
end
establish_connection
fail if result.nil?
end
end
end
def replay_events(
aggregate_prefixes,
event_types,
minimum_xact_id_inclusive,
maximum_xact_id_exclusive,
replay_persistor,
&on_progress
)
Sequent.configuration.event_store.replay_events_from_cursor(
block_size: 1000,
get_events: -> {
event_stream(aggregate_prefixes, event_types, minimum_xact_id_inclusive, maximum_xact_id_exclusive)
},
on_progress: on_progress,
)
replay_persistor.commit
# Also commit all specific declared replay persistors on projectors.
Sequent.configuration.event_handlers.select { |e| e.class.replay_persistor }.each(&:commit)
end
def rollback_migration
disconnect!
establish_connection
drop_old_tables(Sequent.new_version)
executor.reset_table_names(plan)
Versions.rollback!(Sequent.new_version)
end
def groups(group_exponent: 3, limit: nil, offset: nil)
number_of_groups = 16**group_exponent
groups = groups_of_aggregate_id_prefixes(number_of_groups)
groups = groups.drop(offset) unless offset.nil?
groups = groups.take(limit) unless limit.nil?
groups
end
def groups_of_aggregate_id_prefixes(number_of_groups)
all_prefixes = (0...16**LENGTH_OF_SUBSTRING_INDEX_ON_AGGREGATE_ID_IN_EVENT_STORE).to_a.map do |i|
i.to_s(16)
end
all_prefixes = all_prefixes.map { |s| s.length == 3 ? s : "#{'0' * (3 - s.length)}#{s}" }
logger.info "Number of groups #{number_of_groups}"
logger.debug "Prefixes: #{all_prefixes.length}"
if number_of_groups > all_prefixes.length
fail "Can not have more groups #{number_of_groups} than number of prefixes #{all_prefixes.length}"
end
all_prefixes.each_slice(all_prefixes.length / number_of_groups).to_a
end
def in_view_schema(&block)
Sequent::Support::Database.with_schema_search_path(view_schema, db_config, &block)
end
def drop_old_tables(new_version)
versions_to_check = (current_version - 10)..new_version
old_tables = versions_to_check.flat_map do |old_version|
exec_sql(<<~SQL).flat_map(&:values)
select table_name from information_schema.tables where table_schema = '#{Sequent.configuration.view_schema_name}' and table_name LIKE '%_#{old_version}'
SQL
end
old_tables.each do |old_table|
exec_sql("DROP TABLE #{Sequent.configuration.view_schema_name}.#{old_table} CASCADE")
end
end
def on_progress
->(progress, done, ids) do
Sequent::Core::EventStore::PRINT_PROGRESS[progress, done, ids] if progress > 0
end
end
def with_sequent_config(replay_persistor, projectors, &block)
old_config = Sequent.configuration
config = Sequent.configuration.dup
replay_projectors = projectors.map do |projector_class|
projector_class.new(projector_class.replay_persistor || replay_persistor)
end
config.transaction_provider = Sequent::Core::Transactions::NoTransactions.new
config.event_handlers = replay_projectors
Sequent::Configuration.restore(config)
block.call
ensure
Sequent::Configuration.restore(old_config)
end
def event_stream(aggregate_prefixes, event_types, minimum_xact_id_inclusive, maximum_xact_id_exclusive)
fail ArgumentError, 'aggregate_prefixes is mandatory' unless aggregate_prefixes.present?
event_stream = Sequent.configuration.event_record_class.where(event_type: event_types)
event_stream = event_stream.where(<<~SQL, aggregate_prefixes)
substring(aggregate_id::text from 1 for #{LENGTH_OF_SUBSTRING_INDEX_ON_AGGREGATE_ID_IN_EVENT_STORE}) in (?)
SQL
if minimum_xact_id_inclusive && maximum_xact_id_exclusive
event_stream = event_stream.where(
'xact_id >= ? AND xact_id < ?',
minimum_xact_id_inclusive,
maximum_xact_id_exclusive,
)
elsif minimum_xact_id_inclusive
event_stream = event_stream.where('xact_id >= ?', minimum_xact_id_inclusive)
elsif maximum_xact_id_exclusive
event_stream = event_stream.where('xact_id IS NULL OR xact_id < ?', maximum_xact_id_exclusive)
end
event_stream
.order('aggregate_id ASC, sequence_number ASC')
.select('id, event_type, event_json, sequence_number')
end
## shortcut methods
def disconnect!
Sequent::Support::Database.disconnect!
end
def establish_connection
Sequent::Support::Database.establish_connection(db_config)
end
end
end
end