sql/functions/pgq.create_queue.sql
create or replace function pgq.create_queue(i_queue_name text)
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.create_queue(1)
--
-- Creates new queue with given name.
--
-- Returns:
-- 0 - queue already exists
-- 1 - queue created
-- Calls:
-- pgq.grant_perms(i_queue_name);
-- pgq.ticker(i_queue_name);
-- pgq.tune_storage(i_queue_name);
-- Tables directly manipulated:
-- insert - pgq.queue
-- create - pgq.event_N () inherits (pgq.event_template)
-- create - pgq.event_N_0 .. pgq.event_N_M () inherits (pgq.event_N)
-- ----------------------------------------------------------------------
declare
tblpfx text;
tblname text;
idxpfx text;
idxname text;
sql text;
id integer;
tick_seq text;
ev_seq text;
n_tables integer;
begin
if i_queue_name is null then
raise exception 'Invalid NULL value';
end if;
-- check if exists
perform 1 from pgq.queue where queue_name = i_queue_name;
if found then
return 0;
end if;
-- insert event
id := nextval('pgq.queue_queue_id_seq');
tblpfx := 'pgq.event_' || id::text;
idxpfx := 'event_' || id::text;
tick_seq := 'pgq.event_' || id::text || '_tick_seq';
ev_seq := 'pgq.event_' || id::text || '_id_seq';
insert into pgq.queue (queue_id, queue_name,
queue_data_pfx, queue_event_seq, queue_tick_seq)
values (id, i_queue_name, tblpfx, ev_seq, tick_seq);
select queue_ntables into n_tables from pgq.queue
where queue_id = id;
-- create seqs
execute 'CREATE SEQUENCE ' || pgq.quote_fqname(tick_seq);
execute 'CREATE SEQUENCE ' || pgq.quote_fqname(ev_seq);
-- create data tables
execute 'CREATE TABLE ' || pgq.quote_fqname(tblpfx) || ' () '
|| ' INHERITS (pgq.event_template)';
for i in 0 .. (n_tables - 1) loop
tblname := tblpfx || '_' || i::text;
idxname := idxpfx || '_' || i::text || '_txid_idx';
execute 'CREATE TABLE ' || pgq.quote_fqname(tblname) || ' () '
|| ' INHERITS (' || pgq.quote_fqname(tblpfx) || ')';
execute 'ALTER TABLE ' || pgq.quote_fqname(tblname) || ' ALTER COLUMN ev_id '
|| ' SET DEFAULT nextval(' || quote_literal(ev_seq) || ')';
execute 'create index ' || quote_ident(idxname) || ' on '
|| pgq.quote_fqname(tblname) || ' (ev_txid)';
end loop;
perform pgq.grant_perms(i_queue_name);
perform pgq.ticker(i_queue_name);
perform pgq.tune_storage(i_queue_name);
return 1;
end;
$$ language plpgsql security definer;