kafka_fdw
kafka_fdw : kafka Foreign Data Wrapper for CSV formatted messages
Overview
| ID | Extension | Package | Version | Category | License | Language |
|---|---|---|---|---|---|---|
| 8730 | kafka_fdw | kafka_fdw | 0.0.3 | FDW | PostgreSQL | C |
| Attribute | Has Binary | Has Library | Need Load | Has DDL | Relocatable | Trusted |
|---|---|---|---|---|---|---|
--s-d-r | No | Yes | No | Yes | yes | no |
| Relationships | |
|---|---|
| See Also | pgmq mongo_fdw redis_fdw wrappers multicorn redis hdfs_fdw wal2json |
Packages
| Type | Repo | Version | PG Major Compatibility | Package Pattern | Dependencies |
|---|---|---|---|---|---|
| EXT | PIGSTY | 0.0.3 | 18 17 16 15 14 | kafka_fdw | - |
| RPM | PIGSTY | 0.0.3 | 18 17 16 15 14 | kafka_fdw_$v | - |
| DEB | PIGSTY | 0.0.3 | 18 17 16 15 14 | postgresql-$v-kafka-fdw | - |
| Linux / PG | PG18 | PG17 | PG16 | PG15 | PG14 |
|---|---|---|---|---|---|
el8.x86_64 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 |
el8.aarch64 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 |
el9.x86_64 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 |
el9.aarch64 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 |
el10.x86_64 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 |
el10.aarch64 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 |
d12.x86_64 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 |
d12.aarch64 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 |
d13.x86_64 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 |
d13.aarch64 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 |
u22.x86_64 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 |
u22.aarch64 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 |
u24.x86_64 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 |
u24.aarch64 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 | PIGSTY 0.0.3 |
Source
pig build pkg kafka_fdw; # build rpm/debInstall
Make sure PGDG and PIGSTY repo available:
pig repo add pgsql -u # add both repo and update cacheInstall this extension with pig:
pig install kafka_fdw; # install via package name, for the active PG version
pig install kafka_fdw -v 18; # install for PG 18
pig install kafka_fdw -v 17; # install for PG 17
pig install kafka_fdw -v 16; # install for PG 16
pig install kafka_fdw -v 15; # install for PG 15
pig install kafka_fdw -v 14; # install for PG 14Create this extension with:
CREATE EXTENSION kafka_fdw;Usage
kafka_fdw: Kafka Foreign Data Wrapper for CSV formatted messages
Create Server
CREATE EXTENSION kafka_fdw;
CREATE SERVER kafka_server FOREIGN DATA WRAPPER kafka_fdw
OPTIONS (brokers 'localhost:9092');Server Options: brokers (required, comma-separated Kafka broker endpoints).
Create User Mapping
CREATE USER MAPPING FOR PUBLIC SERVER kafka_server;Create Foreign Table (CSV Format)
CREATE FOREIGN TABLE kafka_csv (
part int OPTIONS (partition 'true'),
offs bigint OPTIONS (offset 'true'),
some_int int,
some_text text,
some_date date,
some_time timestamp
)
SERVER kafka_server
OPTIONS (format 'csv', topic 'my_topic', batch_size '30', buffer_delay '100');Two metadata columns are required: one with partition 'true' and one with offset 'true'. The remaining columns match the message format.
Table Options: format (csv or json), topic (Kafka topic name), batch_size, buffer_delay (milliseconds), strict (enforce strict schema validation), ignore_junk (set malformed columns to NULL).
Create Foreign Table (JSON Format)
CREATE FOREIGN TABLE kafka_json (
part int OPTIONS (partition 'true'),
offs bigint OPTIONS (offset 'true'),
some_int int OPTIONS (json 'int_val'),
some_text text OPTIONS (json 'text_val')
)
SERVER kafka_server
OPTIONS (format 'json', topic 'my_json_topic', batch_size '30', buffer_delay '100');Use the json column option to map column names to JSON keys.
Consuming Messages
-- Read from a specific partition and offset
SELECT * FROM kafka_csv WHERE part = 0 AND offs > 1000 LIMIT 60;
-- Read from multiple partitions
SELECT * FROM kafka_csv
WHERE (part = 0 AND offs > 100) OR (part = 1 AND offs > 300);Note: The offset keyword is reserved in SQL; use double quotes when referencing the offset column in some contexts.
Producing Messages
-- Insert with explicit partition
INSERT INTO kafka_csv (part, some_int, some_text)
VALUES (0, 42, 'hello from partition 0');
-- Insert with auto-partition selection
INSERT INTO kafka_csv (some_int, some_text)
VALUES (42, 'auto-partitioned message');