sql/functions/pgq.ticker.sql
create or replace function pgq.ticker(i_queue_name text, i_tick_id bigint, i_orig_timestamp timestamptz, i_event_seq bigint)
returns bigint as $$
-- ----------------------------------------------------------------------
-- Function: pgq.ticker(3)
--
-- External ticker: Insert a tick with a particular tick_id and timestamp.
--
-- Parameters:
-- i_queue_name - Name of the queue
-- i_tick_id - Id of new tick.
--
-- Returns:
-- Tick id.
-- ----------------------------------------------------------------------
begin
insert into pgq.tick (tick_queue, tick_id, tick_time, tick_event_seq)
select queue_id, i_tick_id, i_orig_timestamp, i_event_seq
from pgq.queue
where queue_name = i_queue_name
and queue_external_ticker
and not queue_ticker_paused;
if not found then
raise exception 'queue not found or ticker disabled: %', i_queue_name;
end if;
-- make sure seqs stay current
perform pgq.seq_setval(queue_tick_seq, i_tick_id),
pgq.seq_setval(queue_event_seq, i_event_seq)
from pgq.queue
where queue_name = i_queue_name;
return i_tick_id;
end;
$$ language plpgsql security definer; -- unsure about access
create or replace function pgq.ticker(i_queue_name text)
returns bigint as $$
-- ----------------------------------------------------------------------
-- Function: pgq.ticker(1)
--
-- Check if tick is needed for the queue and insert it.
--
-- For pgqadm usage.
--
-- Parameters:
-- i_queue_name - Name of the queue
--
-- Returns:
-- Tick id or NULL if no tick was done.
-- ----------------------------------------------------------------------
declare
res bigint;
q record;
state record;
last2 record;
begin
select queue_id, queue_tick_seq, queue_external_ticker,
queue_ticker_max_count, queue_ticker_max_lag,
queue_ticker_idle_period, queue_event_seq,
pgq.seq_getval(queue_event_seq) as event_seq,
queue_ticker_paused
into q
from pgq.queue where queue_name = i_queue_name;
if not found then
raise exception 'no such queue';
end if;
if q.queue_external_ticker then
raise exception 'This queue has external tick source.';
end if;
if q.queue_ticker_paused then
raise exception 'Ticker has been paused for this queue';
end if;
-- load state from last tick
select now() - tick_time as lag,
q.event_seq - tick_event_seq as new_events,
tick_id, tick_time, tick_event_seq,
txid_snapshot_xmax(tick_snapshot) as sxmax,
txid_snapshot_xmin(tick_snapshot) as sxmin
into state
from pgq.tick
where tick_queue = q.queue_id
order by tick_queue desc, tick_id desc
limit 1;
if found then
if state.sxmin > txid_current() then
raise exception 'Invalid PgQ state: old xmin=%, old xmax=%, cur txid=%',
state.sxmin, state.sxmax, txid_current();
end if;
if state.new_events < 0 then
raise warning 'Negative new_events? old=% cur=%', state.tick_event_seq, q.event_seq;
end if;
if state.sxmax > txid_current() then
raise warning 'Dubious PgQ state: old xmax=%, cur txid=%', state.sxmax, txid_current();
end if;
if state.new_events > 0 then
-- there are new events, should we wait a bit?
if state.new_events < q.queue_ticker_max_count
and state.lag < q.queue_ticker_max_lag
then
return NULL;
end if;
else
-- no new events, should we apply idle period?
-- check previous event from the last one.
select state.tick_time - tick_time as lag
into last2
from pgq.tick
where tick_queue = q.queue_id
and tick_id < state.tick_id
order by tick_queue desc, tick_id desc
limit 1;
if found then
-- gradually decrease the tick frequency
if (state.lag < q.queue_ticker_max_lag / 2)
or
(state.lag < last2.lag * 2
and state.lag < q.queue_ticker_idle_period)
then
return NULL;
end if;
end if;
end if;
end if;
insert into pgq.tick (tick_queue, tick_id, tick_event_seq)
values (q.queue_id, nextval(q.queue_tick_seq), q.event_seq);
return currval(q.queue_tick_seq);
end;
$$ language plpgsql security definer; -- unsure about access
create or replace function pgq.ticker() returns bigint as $$
-- ----------------------------------------------------------------------
-- Function: pgq.ticker(0)
--
-- Creates ticks for all unpaused queues which dont have external ticker.
--
-- Returns:
-- Number of queues that were processed.
-- ----------------------------------------------------------------------
declare
res bigint;
q record;
begin
res := 0;
for q in
select queue_name from pgq.queue
where not queue_external_ticker
and not queue_ticker_paused
order by queue_name
loop
if pgq.ticker(q.queue_name) > 0 then
res := res + 1;
end if;
end loop;
return res;
end;
$$ language plpgsql security definer;