sql/functions/pgq.batch_event_tables.sql
create or replace function pgq.batch_event_tables(x_batch_id bigint)
returns setof text as $$
-- ----------------------------------------------------------------------
-- Function: pgq.batch_event_tables(1)
--
-- Returns set of table names where this batch events may reside.
--
-- Parameters:
-- x_batch_id - ID of a active batch.
-- ----------------------------------------------------------------------
declare
nr integer;
tbl text;
use_prev integer;
use_next integer;
batch record;
begin
select
txid_snapshot_xmin(last.tick_snapshot) as tx_min, -- absolute minimum
txid_snapshot_xmax(cur.tick_snapshot) as tx_max, -- absolute maximum
q.queue_data_pfx, q.queue_ntables,
q.queue_cur_table, q.queue_switch_step1, q.queue_switch_step2
into batch
from pgq.tick last, pgq.tick cur, pgq.subscription s, pgq.queue q
where cur.tick_id = s.sub_next_tick
and cur.tick_queue = s.sub_queue
and last.tick_id = s.sub_last_tick
and last.tick_queue = s.sub_queue
and s.sub_batch = x_batch_id
and q.queue_id = s.sub_queue;
if not found then
raise exception 'Cannot find data for batch %', x_batch_id;
end if;
-- if its definitely not in one or other, look into both
if batch.tx_max < batch.queue_switch_step1 then
use_prev := 1;
use_next := 0;
elsif batch.queue_switch_step2 is not null
and (batch.tx_min > batch.queue_switch_step2)
then
use_prev := 0;
use_next := 1;
else
use_prev := 1;
use_next := 1;
end if;
if use_prev then
nr := batch.queue_cur_table - 1;
if nr < 0 then
nr := batch.queue_ntables - 1;
end if;
tbl := batch.queue_data_pfx || '_' || nr::text;
return next tbl;
end if;
if use_next then
tbl := batch.queue_data_pfx || '_' || batch.queue_cur_table::text;
return next tbl;
end if;
return;
end;
$$ language plpgsql; -- no perms needed