sql/functions/pgq.maint_rotate_tables.sql
create or replace function pgq.maint_rotate_tables_step1(i_queue_name text)
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.maint_rotate_tables_step1(1)
--
-- Rotate tables for one queue.
--
-- Parameters:
-- i_queue_name - Name of the queue
--
-- Returns:
-- 0
-- ----------------------------------------------------------------------
declare
badcnt integer;
cf record;
nr integer;
tbl text;
lowest_tick_id int8;
lowest_xmin int8;
begin
-- check if needed and load record
select * from pgq.queue into cf
where queue_name = i_queue_name
and queue_rotation_period is not null
and queue_switch_step2 is not null
and queue_switch_time + queue_rotation_period < current_timestamp
for update;
if not found then
return 0;
end if;
-- if DB is in invalid state, stop
if txid_current() < cf.queue_switch_step1 then
raise exception 'queue % maint failure: step1=%, current=%',
i_queue_name, cf.queue_switch_step1, txid_current();
end if;
-- find lowest tick for that queue
select min(sub_last_tick) into lowest_tick_id
from pgq.subscription
where sub_queue = cf.queue_id;
-- if some consumer exists
if lowest_tick_id is not null then
-- is the slowest one still on previous table?
select txid_snapshot_xmin(tick_snapshot) into lowest_xmin
from pgq.tick
where tick_queue = cf.queue_id
and tick_id = lowest_tick_id;
if not found then
raise exception 'queue % maint failure: tick % not found', i_queue_name, lowest_tick_id;
end if;
if lowest_xmin <= cf.queue_switch_step2 then
return 0; -- skip rotation then
end if;
end if;
-- nobody on previous table, we can rotate
-- calc next table number and name
nr := cf.queue_cur_table + 1;
if nr = cf.queue_ntables then
nr := 0;
end if;
tbl := cf.queue_data_pfx || '_' || nr::text;
-- there may be long lock on the table from pg_dump,
-- detect it and skip rotate then
begin
execute 'lock table ' || pgq.quote_fqname(tbl) || ' nowait';
execute 'truncate ' || pgq.quote_fqname(tbl);
exception
when lock_not_available then
-- cannot truncate, skipping rotate
return 0;
end;
-- remember the moment
update pgq.queue
set queue_cur_table = nr,
queue_switch_time = current_timestamp,
queue_switch_step1 = txid_current(),
queue_switch_step2 = NULL
where queue_id = cf.queue_id;
-- Clean ticks by using step2 txid from previous rotation.
-- That should keep all ticks for all batches that are completely
-- in old table. This keeps them for longer than needed, but:
-- 1. we want the pgq.tick table to be big, to avoid Postgres
-- accitentally switching to seqscans on that.
-- 2. that way we guarantee to consumers that they an be moved
-- back on the queue at least for one rotation_period.
-- (may help in disaster recovery)
delete from pgq.tick
where tick_queue = cf.queue_id
and txid_snapshot_xmin(tick_snapshot) < cf.queue_switch_step2;
return 0;
end;
$$ language plpgsql; -- need admin access
create or replace function pgq.maint_rotate_tables_step2()
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.maint_rotate_tables_step2(0)
--
-- Stores the txid when the rotation was visible. It should be
-- called in separate transaction than pgq.maint_rotate_tables_step1()
-- ----------------------------------------------------------------------
begin
update pgq.queue
set queue_switch_step2 = txid_current()
where queue_switch_step2 is null;
return 0;
end;
$$ language plpgsql; -- need admin access