sql/functions/pgq.next_batch.sql
create or replace function pgq.next_batch_info(
in i_queue_name text,
in i_consumer_name text,
out batch_id int8,
out cur_tick_id int8,
out prev_tick_id int8,
out cur_tick_time timestamptz,
out prev_tick_time timestamptz,
out cur_tick_event_seq int8,
out prev_tick_event_seq int8)
as $$
-- ----------------------------------------------------------------------
-- Function: pgq.next_batch_info(2)
--
-- Makes next block of events active.
--
-- If it returns NULL, there is no events available in queue.
-- Consumer should sleep then.
--
-- The values from event_id sequence may give hint how big the
-- batch may be. But they are inexact, they do not give exact size.
-- Client *MUST NOT* use them to detect whether the batch contains any
-- events at all - the values are unfit for that purpose.
--
-- Parameters:
-- i_queue_name - Name of the queue
-- i_consumer_name - Name of the consumer
--
-- Returns:
-- batch_id - Batch ID or NULL if there are no more events available.
-- cur_tick_id - End tick id.
-- cur_tick_time - End tick time.
-- cur_tick_event_seq - Value from event id sequence at the time tick was issued.
-- prev_tick_id - Start tick id.
-- prev_tick_time - Start tick time.
-- prev_tick_event_seq - value from event id sequence at the time tick was issued.
-- Calls:
-- pgq.next_batch_custom(5)
-- Tables directly manipulated:
-- None
-- ----------------------------------------------------------------------
begin
select f.batch_id, f.cur_tick_id, f.prev_tick_id,
f.cur_tick_time, f.prev_tick_time,
f.cur_tick_event_seq, f.prev_tick_event_seq
into batch_id, cur_tick_id, prev_tick_id, cur_tick_time, prev_tick_time,
cur_tick_event_seq, prev_tick_event_seq
from pgq.next_batch_custom(i_queue_name, i_consumer_name, NULL, NULL, NULL) f;
return;
end;
$$ language plpgsql;
create or replace function pgq.next_batch(
in i_queue_name text,
in i_consumer_name text)
returns int8 as $$
-- ----------------------------------------------------------------------
-- Function: pgq.next_batch(2)
--
-- Old function that returns just batch_id.
--
-- Parameters:
-- i_queue_name - Name of the queue
-- i_consumer_name - Name of the consumer
--
-- Returns:
-- Batch ID or NULL if there are no more events available.
-- ----------------------------------------------------------------------
declare
res int8;
begin
select batch_id into res
from pgq.next_batch_info(i_queue_name, i_consumer_name);
return res;
end;
$$ language plpgsql;
create or replace function pgq.next_batch_custom(
in i_queue_name text,
in i_consumer_name text,
in i_min_lag interval,
in i_min_count int4,
in i_min_interval interval,
out batch_id int8,
out cur_tick_id int8,
out prev_tick_id int8,
out cur_tick_time timestamptz,
out prev_tick_time timestamptz,
out cur_tick_event_seq int8,
out prev_tick_event_seq int8)
as $$
-- ----------------------------------------------------------------------
-- Function: pgq.next_batch_custom(5)
--
-- Makes next block of events active. Block size can be tuned
-- with i_min_count, i_min_interval parameters. Events age can
-- be tuned with i_min_lag.
--
-- If it returns NULL, there is no events available in queue.
-- Consumer should sleep then.
--
-- The values from event_id sequence may give hint how big the
-- batch may be. But they are inexact, they do not give exact size.
-- Client *MUST NOT* use them to detect whether the batch contains any
-- events at all - the values are unfit for that purpose.
--
-- Note:
-- i_min_lag together with i_min_interval/i_min_count is inefficient.
--
-- Parameters:
-- i_queue_name - Name of the queue
-- i_consumer_name - Name of the consumer
-- i_min_lag - Consumer wants events older than that
-- i_min_count - Consumer wants batch to contain at least this many events
-- i_min_interval - Consumer wants batch to cover at least this much time
--
-- Returns:
-- batch_id - Batch ID or NULL if there are no more events available.
-- cur_tick_id - End tick id.
-- cur_tick_time - End tick time.
-- cur_tick_event_seq - Value from event id sequence at the time tick was issued.
-- prev_tick_id - Start tick id.
-- prev_tick_time - Start tick time.
-- prev_tick_event_seq - value from event id sequence at the time tick was issued.
-- Calls:
-- pgq.insert_event_raw(11)
-- Tables directly manipulated:
-- update - pgq.subscription
-- ----------------------------------------------------------------------
declare
errmsg text;
queue_id integer;
sub_id integer;
cons_id integer;
begin
select s.sub_queue, s.sub_consumer, s.sub_id, s.sub_batch,
t1.tick_id, t1.tick_time, t1.tick_event_seq,
t2.tick_id, t2.tick_time, t2.tick_event_seq
into queue_id, cons_id, sub_id, batch_id,
prev_tick_id, prev_tick_time, prev_tick_event_seq,
cur_tick_id, cur_tick_time, cur_tick_event_seq
from pgq.consumer c,
pgq.queue q,
pgq.subscription s
left join pgq.tick t1
on (t1.tick_queue = s.sub_queue
and t1.tick_id = s.sub_last_tick)
left join pgq.tick t2
on (t2.tick_queue = s.sub_queue
and t2.tick_id = s.sub_next_tick)
where q.queue_name = i_queue_name
and c.co_name = i_consumer_name
and s.sub_queue = q.queue_id
and s.sub_consumer = c.co_id;
if not found then
errmsg := 'Not subscriber to queue: '
|| coalesce(i_queue_name, 'NULL')
|| '/'
|| coalesce(i_consumer_name, 'NULL');
raise exception '%', errmsg;
end if;
-- sanity check
if prev_tick_id is null then
raise exception 'PgQ corruption: Consumer % on queue % does not see tick %', i_consumer_name, i_queue_name, prev_tick_id;
end if;
-- has already active batch
if batch_id is not null then
return;
end if;
if i_min_interval is null and i_min_count is null then
-- find next tick
select tick_id, tick_time, tick_event_seq
into cur_tick_id, cur_tick_time, cur_tick_event_seq
from pgq.tick
where tick_id > prev_tick_id
and tick_queue = queue_id
order by tick_queue asc, tick_id asc
limit 1;
else
-- find custom tick
select next_tick_id, next_tick_time, next_tick_seq
into cur_tick_id, cur_tick_time, cur_tick_event_seq
from pgq.find_tick_helper(queue_id, prev_tick_id,
prev_tick_time, prev_tick_event_seq,
i_min_count, i_min_interval);
end if;
if i_min_lag is not null then
-- enforce min lag
if now() - cur_tick_time < i_min_lag then
cur_tick_id := NULL;
cur_tick_time := NULL;
cur_tick_event_seq := NULL;
end if;
end if;
if cur_tick_id is null then
-- nothing to do
prev_tick_id := null;
prev_tick_time := null;
prev_tick_event_seq := null;
return;
end if;
-- get next batch
batch_id := nextval('pgq.batch_id_seq');
update pgq.subscription
set sub_batch = batch_id,
sub_next_tick = cur_tick_id,
sub_active = now()
where sub_queue = queue_id
and sub_consumer = cons_id;
return;
end;
$$ language plpgsql security definer;