lib/pg/logical_replication/client.rb
require "pg"
module PG
module LogicalReplication
class Client
attr_reader :connection
attr_reader :command_builder
def self.type_map_for_queries(connection)
@type_map_for_queries ||= PG::BasicTypeMapForQueries.new(connection)
end
def self.type_map_for_results(connection)
@type_map_for_results ||= PG::BasicTypeMapForResults.new(connection)
end
# @param connection [PG::Connection] Database Connection
def initialize(connection)
@connection = connection
@command_builder = PG::LogicalReplication::CommandBuilder.new(connection)
end
# Reports on replication lag from publisher to subscriber nodes
# This method must be run on the publisher node
#
# @return [Array<Hash<String,String>>] List of returned lag and application names,
# one for each replication process
def lag_bytes
typed_exec(<<-SQL).to_a
SELECT
pg_wal_lsn_diff(pg_current_wal_insert_lsn(), flush_lsn) AS lag_bytes,
application_name
FROM
pg_stat_replication
SQL
end
# Reports on replication bytes of WAL being retained for each replication slot
# This method must be run on the publisher node
#
# @return [Array<Hash<String,String>>] List of returned WAL bytes and replication slot names,
# one for each replication process
def wal_retained_bytes
typed_exec(<<-SQL).to_a
SELECT
pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn) AS retained_bytes,
slot_name::TEXT
FROM
pg_replication_slots
WHERE
plugin = 'pgoutput'
SQL
end
# Creates a subscription to a publisher node
#
# @param name [String] subscription name
# @param conninfo_hash [Hash] publisher node connection info
# @param publications [Array<String>] publication names to subscribe to
# @param options [Hash] optional parameters for CREATE SUBSCRIPTION
def create_subscription(name, conninfo_hash, publications, options = {})
options[:slot_name] = name if !options.key?(:slot_name) && !options.key?("slot_name") && (options['create_slot'] == false || options[:create_slot] == false)
connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash))
base_command = <<-SQL
CREATE SUBSCRIPTION #{connection.quote_ident(name)}
CONNECTION '#{connection_string}'
PUBLICATION #{safe_list(publications)}
SQL
typed_exec(command_builder.command_with_options(base_command, "WITH", options))
end
# Disconnects the subscription and removes it
#
# @param name [String] subscription name
# @param ifexists [Boolean] if true an error is not thrown when the subscription does not exist
def drop_subscription(name, ifexists = false)
typed_exec("DROP SUBSCRIPTION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}")
end
# Creates a logical replication slot
#
# @param name [String] logical replication slot name
def create_logical_replication_slot(name)
typed_exec("SELECT pg_create_logical_replication_slot(#{connection.escape_literal(name)}, 'pgoutput')")
end
# Drops the physical or logical replication slot. Note, you must be on the same database a logical slot was created.
#
# @param name [String] replication slot name
def drop_replication_slot(name)
typed_exec("SELECT pg_drop_replication_slot(#{connection.escape_literal(name)})")
end
# Updates a subscription connection string
#
# @param name [String] subscription name
# @param conninfo_hash [Hash] new external connection hash to the publisher node
def set_subscription_conninfo(name, conninfo_hash)
connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash))
typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} CONNECTION '#{connection_string}'")
end
# Changes list of subscribed publications
#
# @param name [String] subscription name
# @param publications [Array<String>] publication names to subscribe to
# @param options [Hash] optional parameters
def set_subscription_publications(name, publications, options = {})
base_command = <<-SQL
ALTER SUBSCRIPTION #{connection.quote_ident(name)}
SET PUBLICATION #{safe_list(publications)}
SQL
typed_exec(@command_builder.command_with_options(base_command, "WITH", options))
end
# Fetch missing table information from publisher
#
# @param name [String] subscription name
# @param options [Hash] optional parameters
def sync_subscription(name, options = {})
base_command = <<-SQL
ALTER SUBSCRIPTION #{connection.quote_ident(name)} REFRESH PUBLICATION
SQL
typed_exec(@command_builder.command_with_options(base_command, "WITH", options))
end
# Enables the previously disabled subscription
#
# @param name [String] subscription name
def enable_subscription(name)
typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} ENABLE")
end
# Disables the running subscription
#
# @param name [String] subscription name
def disable_subscription(name)
typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} DISABLE")
end
# Alters parameters originally set by CREATE SUBSCRIPTION
#
# @param name [String] subscription name
# @param options [Hash] parameters to set
def alter_subscription_options(name, options)
base_command = "ALTER SUBSCRIPTION #{connection.quote_ident(name)}"
typed_exec(command_builder.command_with_options(base_command, "SET", options))
end
# Sets the owner of the subscription
#
# @param name [String] subscription name
# @param owner [String] new owner user name
def set_subscription_owner(name, owner)
typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}")
end
# Renames the subscription
#
# @param name [String] current subscription name
# @param new_name [String] new subscription name
def rename_subscription(name, new_name)
typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}")
end
# Shows status and basic information about all subscriptions
#
# @return [Array<Hash>] a list of subscriptions
# keys:
# subscription_name
# database_name
# owner
# worker_count
# enabled
# subscription_dsn
# slot_name
# publications
# remote_replication_lsn
# local_replication_lsn
def subscriptions(dbname = nil)
subscriptions = typed_exec(<<-SQL).to_a
SELECT
sub.subname::TEXT AS subscription_name,
pg_database.datname::TEXT AS database_name,
pg_user.usename::TEXT AS owner,
COUNT(sub_stat.pid) AS worker_count,
sub.subenabled AS enabled,
sub.subconninfo AS subscription_dsn,
sub.subslotname::TEXT AS slot_name,
sub.subpublications AS publications,
stat.remote_lsn::TEXT AS remote_replication_lsn,
stat.local_lsn::TEXT AS local_replication_lsn
FROM
pg_subscription AS sub
JOIN pg_user
ON sub.subowner = usesysid
JOIN pg_database
ON sub.subdbid = pg_database.oid
LEFT JOIN pg_replication_origin_status stat
ON concat('pg_', sub.oid) = stat.external_id
LEFT JOIN pg_stat_subscription sub_stat
ON sub_stat.subid = sub.oid AND sub_stat.pid IS NOT NULL
GROUP BY
sub.subname,
pg_database.datname,
pg_user.usename,
sub.subenabled,
sub.subconninfo,
sub.subslotname,
sub.subpublications,
stat.remote_lsn,
stat.local_lsn
SQL
dbname ? subscriptions.select { |s| s["database_name"] == dbname } : subscriptions
end
# Returns if this database is subscribing to any publications
#
# @return [Boolean] true if there are any subscriptions, false otherwise
def subscriber?(dbname = nil)
subscriptions(dbname).any?
end
# Lists the current replication slots
#
# @return [Array<String>] replication slots
def replication_slots
typed_exec(<<-SQL)
SELECT
slot_name::TEXT,
plugin::TEXT,
slot_type::TEXT,
database::TEXT,
temporary,
active
FROM pg_replication_slots
SQL
end
# Lists the current publications
#
# @return [Array<String>] publication names
def publications
typed_exec(<<-SQL)
SELECT
pubname::TEXT AS name,
usename::TEXT AS owner,
puballtables,
pubinsert,
pubupdate,
pubdelete
FROM
pg_publication
JOIN pg_user ON pubowner = usesysid
SQL
end
def publishes?(publication_name)
publications.any? { |p| p["name"] == publication_name }
end
# Creates a new publication
#
# @param name [String] publication name
# @param all_tables [Boolean] replicate changes for all tables, including ones created in the future
# @param tables [Array<String>] tables to be added to the publication, ignored if all_tables is true
# @param options [Hash] optional parameters
def create_publication(name, all_tables = false, tables = [], options = {})
base_command = "CREATE PUBLICATION #{connection.quote_ident(name)}"
if all_tables
base_command << " FOR ALL TABLES"
elsif !tables.empty?
base_command << " FOR TABLE #{safe_list(tables)}"
end
typed_exec(@command_builder.command_with_options(base_command, "WITH", options))
end
# Adds tables to a publication
#
# @param name [String] publication name
# @param tables [Array<String>] table names to add
def add_tables_to_publication(name, tables)
typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} ADD TABLE #{safe_list(tables)}")
end
# Sets the tables included in a publication
#
# @param name [String] publication name
# @param tables [Array<String>] table names
def set_publication_tables(name, tables)
typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} SET TABLE #{safe_list(tables)}")
end
# Removes tables from a publication
#
# @param name [String] publication name
# @param tables [Array<String>] table names to remove
def remove_tables_from_publication(name, tables)
typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} DROP TABLE #{safe_list(tables)}")
end
# Alters parameters originally set by CREATE PUBLICATION
#
# @param name [String] publication name
# @param options [Hash] parameters to set
def alter_publication_options(name, options)
base_command = "ALTER PUBLICATION #{connection.quote_ident(name)}"
typed_exec(command_builder.command_with_options(base_command, "SET", options))
end
# Sets the owner of a publication
#
# @param name [String] publication name
# @param owner [String] new owner user name
def set_publication_owner(name, owner)
typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}")
end
# Renames a publication
#
# @param name [String] current publication name
# @param new_name [String] new publication name
def rename_publication(name, new_name)
typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}")
end
# Remove a publication
#
# @param name [String] publication name
# @param ifexists [Boolean] if true an error is not thrown when the publication does not exist
def drop_publication(name, ifexists = false)
typed_exec("DROP PUBLICATION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}")
end
# Lists the tables currently in the publication
#
# @param set_name [String] publication name
# @return [Array<String>] table names
def tables_in_publication(name)
typed_exec(<<-SQL, name).values.flatten
SELECT tablename::TEXT
FROM pg_publication_tables
WHERE pubname = $1
SQL
end
private
def safe_list(list)
list.map { |e| connection.quote_ident(e) }.join(", ")
end
def typed_exec(sql, *params)
result = connection.async_exec(sql, params, nil, self.class.type_map_for_queries(connection))
result.map_types!(self.class.type_map_for_results(connection))
end
end
end
end