th-pg-config=/path/to/pg_config/gems/pg-0.18.1/sample/wal_shipper.rb
#!/usr/bin/env ruby
#
# A script to wrap ssh and rsync for PostgreSQL WAL files shipping.
# Mahlon E. Smith <mahlon@martini.nu>
#
# Based off of Joshua Drake's PITRTools concept, but with some important
# differences:
#
# - Only supports PostgreSQL >= 8.3
# - No support for rsync version < 3
# - Only shipping, no client side sync (too much opportunity for failure,
# and it's easy to get a base backup manually)
# - WAL files are only stored once, regardless of how many
# slaves are configured or not responding, and are removed from
# the master when they are no longer needed.
# - Each slave can have completely distinct settings, instead
# of a single set of options applied to all slaves
# - slave sync can be individually paused from the master
# - can run synchronously, or if you have a lot of slaves, threaded async mode
# - It's ruby, instead of python. :)
#
# wal_shipper is configurable via an external YAML file, and will create
# a template on its first run -- you'll need to modify it! It expects
# a directory structure like so:
#
# postgres/
# data/...
# bin/wal_shipper.rb
# etc/wal_shipper.conf <-- YAML settings!
# wal/
#
# It should be loaded from the PostgreSQL master's postgresql.conf
# as such, after putting it into your postgres user homedir under 'bin':
#
# archive_command = '/path/to/postgres_home/bin/wal_shipper.rb %p'
#
# Passwordless ssh keys need to be set up for the postgres user on all
# participating masters and slaves.
#
# You can use any replay method of your choosing on the slaves.
# Here's a nice example using pg_standby, to be put in data/recovery.conf:
#
# restore_command = 'pg_standby -t /tmp/pgrecovery.done -s5 -w0 -c /path/to/postgres_home/wal_files/ %f %p %r'
#
# Or, here's another simple alternative data/recovery.conf, for using WAL shipping
# alongside streaming replication:
#
# standby_mode = 'on'
# primary_conninfo = 'host=master.example.com port=5432 user=repl password=XXXXXXX'
# restore_command = 'cp /usr/local/pgsql/wal/%f %p'
# trigger_file = '/usr/local/pgsql/pg.become_primary'
# archive_cleanup_command = '/usr/local/bin/pg_archivecleanup /usr/local/pgsql/wal %r'
#
#========================================================================================
require 'pathname'
require 'yaml'
require 'fileutils'
require 'ostruct'
### Encapsulate WAL shipping functionality.
###
module WalShipper
### Send messages to the PostgreSQL log files.
###
def log( msg )
return unless @debug
puts "WAL Shipper: %s" % [ msg ]
end
### An object that represents a single destination from the
### configuration file.
###
class Destination < OpenStruct
include WalShipper
### Create a new WalShipper::Destination object.
def initialize( dest, debug=false )
@debug = debug
super( dest )
self.validate
end
#########
protected
#########
### Check for required keys and normalize various keys.
###
def validate
# Check for required destination keys
%w[ label kind ].each do |key|
if self.send( key.to_sym ).nil?
self.log "Destination %p missing required '%s' key." % [ self, key ]
self.invalid = true
end
end
# Ensure paths are Pathnames for the 'file' destination type.
self.path = Pathname.new( self.path ) if self.kind == 'file'
if self.kind == 'rsync-ssh'
self.port ||= 22
self.user = self.user ? "#{self.user}@" : ''
end
end
end # Class Destination
### Class for creating new Destination objects and determining how to
### ship WAL files to them.
###
class Dispatcher
include WalShipper
### Create a new Shipper object, given a +conf+ hash and a +wal+ file
### Pathname object.
###
def initialize( wal, conf )
# Make the config keys instance variables.
conf.each_pair {|key, val| self.instance_variable_set( "@#{key}", val ) }
# Spool directory check.
#
@spool = Pathname.new( @spool )
@spool.exist? or raise "The configured spool directory (%s) doesn't exist." % [ @spool ]
# Stop right away if we have disabled shipping.
#
unless @enabled
self.log "WAL shipping is disabled, queuing segment %s" % [ wal.basename ]
exit 1
end
# Instantiate Destination objects, creating new spool directories
# for each.
#
@destinations.
collect!{|dest| WalShipper::Destination.new( dest, @debug ) }.
reject {|dest| dest.invalid }.
collect do |dest|
dest.spool = @spool + dest.label
dest.spool.mkdir( 0711 ) unless dest.spool.exist?
dest
end
# Put the WAL file into the spool for processing!
#
@waldir = @spool + 'wal_segments'
@waldir.mkdir( 0711 ) unless @waldir.exist?
self.log "Copying %s to %s" % [ wal.basename, @waldir ]
FileUtils::cp wal, @waldir
# 'wal' now references the copy. The original is managed and auto-expired
# by PostgreSQL when a new checkpoint segment it reached.
@wal = @waldir + wal.basename
end
### Create hardlinks for the WAL file into each of the destination directories
### for separate queueing and recording of what was shipped successfully.
###
def link
@destinations.each do |dest|
self.log "Linking %s into %s" % [ @wal.basename, dest.spool.basename ]
FileUtils::ln @wal, dest.spool, :force => true
end
end
### Decide to be synchronous or threaded, and delegate each destination
### to the proper ship method.
###
def dispatch
# Synchronous mode.
#
unless @async
self.log "Performing a synchronous dispatch."
@destinations.each {|dest| self.dispatch_dest( dest ) }
return
end
tg = ThreadGroup.new
# Async, one thread per destination
#
if @async_max.nil? || @async_max.to_i.zero?
self.log "Performing an asynchronous dispatch: one thread per destination."
@destinations.each do |dest|
t = Thread.new do
Thread.current.abort_on_exception = true
self.dispatch_dest( dest )
end
tg.add( t )
end
tg.list.each {|t| t.join }
return
end
# Async, one thread per destination, in groups of asynx_max size.
#
self.log "Performing an asynchronous dispatch: one thread per destination, %d at a time." % [ @async_max ]
all_dests = @destinations.dup
dest_chunks = []
until all_dests.empty? do
dest_chunks << all_dests.slice!( 0, @async_max )
end
dest_chunks.each do |chunk|
chunk.each do |dest|
t = Thread.new do
Thread.current.abort_on_exception = true
self.dispatch_dest( dest )
end
tg.add( t )
end
tg.list.each {|t| t.join }
end
return
end
### Remove any WAL segments no longer needed by slaves.
###
def clean_spool
total = 0
@waldir.children.each do |wal|
if wal.stat.nlink == 1
total += wal.unlink
end
end
self.log "Removed %d WAL segment%s." % [ total, total == 1 ? '' : 's' ]
end
#########
protected
#########
### Send WAL segments to remote +dest+ via rsync+ssh.
### Passwordless keys between the user running this script (postmaster owner)
### and remote user need to be set up in advance.
###
def ship_rsync_ssh( dest )
if dest.host.nil?
self.log "Destination %p missing required 'host' key. WAL is queued." % [ dest.host ]
return
end
rsync_flags = '-zc'
ssh_string = "%s -o ConnectTimeout=%d -o StrictHostKeyChecking=no -p %d" %
[ @ssh, @ssh_timeout || 10, dest.port ]
src_string = ''
dst_string = "%s%s:%s/" % [ dest.user, dest.host, dest.path ]
# If there are numerous files in the spool dir, it means there was
# an error transferring to this host in the past. Try and ship all
# WAL segments, instead of just the new one. PostgreSQL on the slave
# side will "do the right thing" as they come in, regardless of
# ordering.
#
if dest.spool.children.length > 1
src_string = dest.spool.to_s + '/'
rsync_flags << 'r'
else
src_string = dest.spool + @wal.basename
end
ship_wal_cmd = [
@rsync,
@debug ? (rsync_flags << 'vh') : (rsync_flags << 'q'),
'--remove-source-files',
'-e', ssh_string,
src_string, dst_string
]
self.log "Running command '%s'" % [ ship_wal_cmd.join(' ') ]
system *ship_wal_cmd
# Run external notification program on error, if one is configured.
#
unless $?.success?
self.log "Ack! Error while shipping to %p, WAL is queued." % [ dest.label ]
system @error_cmd, dest.label if @error_cmd
end
end
### Copy WAL segments to remote path as set in +dest+.
### This is useful for longer term PITR, copying to NFS shares, etc.
###
def ship_file( dest )
if dest.path.nil?
self.log "Destination %p missing required 'path' key. WAL is queued." % [ dest ]
return
end
dest.path.mkdir( 0711 ) unless dest.path.exist?
# If there are numerous files in the spool dir, it means there was
# an error transferring to this host in the past. Try and ship all
# WAL segments, instead of just the new one. PostgreSQL on the slave
# side will "do the right thing" as they come in, regardless of
# ordering.
#
if dest.spool.children.length > 1
dest.spool.children.each do |wal|
wal.unlink if self.copy_file( wal, dest.path, dest.label, dest.compress )
end
else
wal = dest.spool + @wal.basename
wal.unlink if self.copy_file( wal, dest.path, dest.label, dest.compress )
end
end
### Given a +wal+ Pathname, a +path+ destination, and the destination
### label, copy and optionally compress a WAL file.
###
def copy_file( wal, path, label, compress=false )
dest_file = path + wal.basename
FileUtils::cp wal, dest_file
if compress
system *[ 'gzip', '-f', dest_file ]
raise "Error while compressing: %s" % [ wal.basename ] unless $?.success?
end
self.log "Copied %s%s to %s." %
[ wal.basename, compress ? ' (and compressed)' : '', path ]
return true
rescue => err
self.log "Ack! Error while copying '%s' (%s) to %p, WAL is queued." %
[ wal.basename, err.message, path ]
system @error_cmd, label if @error_cmd
return false
end
### Figure out how to send the WAL file to its intended destination +dest+.
###
def dispatch_dest( dest )
if ! dest.enabled.nil? && ! dest.enabled
self.log "Skipping explicity disabled destination %p, WAL is queued." % [ dest.label ]
return
end
# Send to the appropriate method. ( rsync-ssh --> ship_rsync_ssh )
#
meth = ( 'ship_' + dest.kind.gsub(/-/, '_') ).to_sym
if WalShipper::Dispatcher.method_defined?( meth )
self.send( meth, dest )
else
self.log "Unknown destination kind %p for %p. WAL is queued." % [ dest.kind, dest.label ]
end
end
end
end
# Ship the WAL file!
#
if __FILE__ == $0
CONFIG_DIR = Pathname.new( __FILE__ ).dirname.parent + 'etc'
CONFIG = CONFIG_DIR + 'wal_shipper.conf'
unless CONFIG.exist?
CONFIG_DIR.mkdir( 0711 ) unless CONFIG_DIR.exist?
CONFIG.open('w') {|conf| conf.print(DATA.read) }
CONFIG.chmod( 0644 )
puts "No WAL shipping configuration found, default file created."
end
wal = ARGV[0] or raise "No WAL file was specified on the command line."
wal = Pathname.new( wal )
conf = YAML.load( CONFIG.read )
shipper = WalShipper::Dispatcher.new( wal, conf )
shipper.link
shipper.dispatch
shipper.clean_spool
end
__END__
---
# Spool from pg_xlog to the working area?
# This must be set to 'true' for wal shipping to function!
enabled: false
# Log everything to the PostgreSQL log files?
debug: true
# The working area for WAL segments.
spool: /opt/local/var/db/postgresql84/wal
# With multiple slaves, ship WAL in parallel, or be synchronous?
async: false
# Put a ceiling on the parallel threads?
# '0' or removing this option uses a thread for each destination,
# regardless of how many you have. Keep in mind that's 16 * destination
# count megs of simultaneous bandwidth.
async_max: 5
# Paths and settings for various binaries.
rsync: /usr/bin/rsync
ssh: /usr/bin/ssh
ssh_timeout: 10
destinations:
- label: rsync-example
port: 2222
kind: rsync-ssh
host: localhost
user: postgres
path: wal # relative to the user's homedir on the remote host
enabled: false
- label: file-example
kind: file
compress: true
enabled: true
path: /tmp/someplace