pgq

pgq : Generic queue for PostgreSQL

Overview

IDExtensionPackageVersionCategoryLicenseLanguage
2890
pgq
pgq
3.5.1
FEAT
ISC
C
AttributeHas BinaryHas LibraryNeed LoadHas DDLRelocatableTrusted
--s-d--
No
Yes
No
Yes
no
no
Relationships
Schemaspg_catalog
See Also
age
hll
rum
pg_graphql
pg_jsonschema
jsquery
pg_hint_plan
hypopg

Packages

TypeRepoVersionPG Major CompatibilityPackage PatternDependencies
EXT
PGDG
3.5.1
18
17
16
15
14
pgq-
RPM
PGDG
3.5.1
18
17
16
15
14
pgq_$v-
DEB
PGDG
3.5.1
18
17
16
15
14
postgresql-$v-pgq3-
Linux / PGPG18PG17PG16PG15PG14
el8.x86_64
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
el8.aarch64
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
el9.x86_64
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
el9.aarch64
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
el10.x86_64
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
el10.aarch64
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
d12.x86_64
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
d12.aarch64
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
d13.x86_64
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
d13.aarch64
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
u22.x86_64
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
u22.aarch64
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
u24.x86_64
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
u24.aarch64
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PackageVersionOSORGSIZEFile URL
pgq_143.5.1el8.x86_64pgdg55.3 KiBpgq_14-3.5.1-1PGDG.rhel8.x86_64.rpm
pgq_143.5el8.x86_64pgdg54.8 KiBpgq_14-3.5-1.rhel8.x86_64.rpm
pgq_143.4.2el8.x86_64pgdg54.4 KiBpgq_14-3.4.2-1.rhel8.x86_64.rpm
pgq_143.4.1el8.x86_64pgdg108.6 KiBpgq_14-3.4.1-2.rhel8.x86_64.rpm
pgq_143.5.1el8.aarch64pgdg55.5 KiBpgq_14-3.5.1-1PGDG.rhel8.aarch64.rpm
pgq_143.5el8.aarch64pgdg55.0 KiBpgq_14-3.5-1.rhel8.aarch64.rpm
pgq_143.5.1el9.x86_64pgdg53.5 KiBpgq_14-3.5.1-1PGDG.rhel9.x86_64.rpm
pgq_143.5el9.x86_64pgdg53.3 KiBpgq_14-3.5-1.rhel9.x86_64.rpm
pgq_143.4.2el9.x86_64pgdg107.8 KiBpgq_14-3.4.2-1.rhel9.x86_64.rpm
pgq_143.5.1el9.aarch64pgdg53.3 KiBpgq_14-3.5.1-1PGDG.rhel9.aarch64.rpm
pgq_143.5el9.aarch64pgdg53.1 KiBpgq_14-3.5-1.rhel9.aarch64.rpm
pgq_143.5.1el10.x86_64pgdg53.2 KiBpgq_14-3.5.1-4PGDG.rhel10.x86_64.rpm
pgq_143.5.1el10.aarch64pgdg52.9 KiBpgq_14-3.5.1-4PGDG.rhel10.aarch64.rpm
postgresql-14-pgq33.5.1d12.x86_64pgdg124.1 KiBpostgresql-14-pgq3_3.5.1-2.pgdg12+1_amd64.deb
postgresql-14-pgq33.5.1d12.aarch64pgdg123.0 KiBpostgresql-14-pgq3_3.5.1-2.pgdg12+1_arm64.deb
postgresql-14-pgq33.5.1d13.x86_64pgdg124.0 KiBpostgresql-14-pgq3_3.5.1-2.pgdg13+1_amd64.deb
postgresql-14-pgq33.5.1d13.aarch64pgdg123.1 KiBpostgresql-14-pgq3_3.5.1-2.pgdg13+1_arm64.deb
postgresql-14-pgq33.5.1u22.x86_64pgdg134.8 KiBpostgresql-14-pgq3_3.5.1-2.pgdg22.04+1_amd64.deb
postgresql-14-pgq33.5.1u22.aarch64pgdg133.5 KiBpostgresql-14-pgq3_3.5.1-2.pgdg22.04+1_arm64.deb
postgresql-14-pgq33.5.1u24.x86_64pgdg124.0 KiBpostgresql-14-pgq3_3.5.1-2.pgdg24.04+1_amd64.deb
postgresql-14-pgq33.5.1u24.aarch64pgdg122.9 KiBpostgresql-14-pgq3_3.5.1-2.pgdg24.04+1_arm64.deb

Source

Install

Make sure PGDG repo available:

pig repo add pgdg -u    # add pgdg repo and update cache

Install this extension with pig:

pig install pgq;		# install via package name, for the active PG version

pig install pgq -v 18;   # install for PG 18
pig install pgq -v 17;   # install for PG 17
pig install pgq -v 16;   # install for PG 16
pig install pgq -v 15;   # install for PG 15
pig install pgq -v 14;   # install for PG 14

Create this extension with:

CREATE EXTENSION pgq;

Usage

pgq: Generic high-performance lockless queue for PostgreSQL

PgQ is a PostgreSQL extension that provides a generic, high-performance lockless queue with a simple SQL function API. It uses a producer-consumer model with batch-based event processing.

CREATE EXTENSION pgq;

Core Concepts

  • Queue: A named event stream. Events are inserted by producers and consumed in batches.
  • Consumer: A named subscriber registered to a queue. Each consumer tracks its own position.
  • Batch: A group of events retrieved together. Consumers process events batch by batch.
  • Ticker: A background process that creates batch boundaries (ticks) at regular intervals.

Queue Management

-- Create a queue
SELECT pgq.create_queue('myqueue');

-- Drop a queue
SELECT pgq.drop_queue('myqueue');

-- Get queue info
SELECT * FROM pgq.get_queue_info();
SELECT * FROM pgq.get_queue_info('myqueue');

Consumer Registration

-- Register a consumer on a queue
SELECT pgq.register_consumer('myqueue', 'myconsumer');

-- Unregister a consumer
SELECT pgq.unregister_consumer('myqueue', 'myconsumer');

-- Get consumer info
SELECT * FROM pgq.get_consumer_info('myqueue');

Producing Events

-- Insert an event into a queue
SELECT pgq.insert_event('myqueue', 'event_type', 'event_data');

-- Insert with extra fields
SELECT pgq.insert_event('myqueue', 'event_type', 'event_data',
                         'extra1', 'extra2', 'extra3', 'extra4');

Consuming Events

-- Get the next batch of events (returns batch_id or NULL if no new batches)
SELECT pgq.next_batch('myqueue', 'myconsumer');

-- Get events from the batch
SELECT * FROM pgq.get_batch_events(:batch_id);

-- Retry a failed event (will reappear after the specified interval)
SELECT pgq.event_retry(:batch_id, :event_id, :retry_seconds);

-- Mark batch as done
SELECT pgq.finish_batch(:batch_id);

Typical Consumer Loop

-- 1. Get next batch
SELECT pgq.next_batch('myqueue', 'myconsumer') AS batch_id;

-- 2. If batch_id is not NULL, get events
SELECT * FROM pgq.get_batch_events(:batch_id);

-- 3. Process events, retry failures
SELECT pgq.event_retry(:batch_id, :event_id, 60);

-- 4. Finish the batch
SELECT pgq.finish_batch(:batch_id);

Maintenance

PgQ requires a ticker daemon (pgqd) to run in the background for creating batch boundaries and performing maintenance tasks like table rotation and retry event processing.

Key Functions

FunctionDescription
pgq.create_queue(name)Create a new queue
pgq.drop_queue(name)Remove a queue
pgq.register_consumer(queue, consumer)Register a consumer
pgq.unregister_consumer(queue, consumer)Unregister a consumer
pgq.insert_event(queue, type, data, ...)Insert an event
pgq.next_batch(queue, consumer)Get next batch ID
pgq.get_batch_events(batch_id)Get events from a batch
pgq.event_retry(batch_id, event_id, seconds)Schedule event retry
pgq.finish_batch(batch_id)Mark batch as processed
pgq.get_queue_info([name])Get queue statistics
pgq.get_consumer_info(queue)Get consumer statistics
Last updated on